You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:00 UTC

[16/69] [abbrv] [partial] Initial work to rename package to org.apache.spark

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
deleted file mode 100644
index 0248830..0000000
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler.cluster
-
-import java.nio.ByteBuffer
-
-import spark.TaskState.TaskState
-import spark.scheduler.TaskSet
-
-/**
- * Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of
- * each task and is responsible for retries on failure and locality. The main interfaces to it
- * are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, and
- * statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
- *
- * THREADING: This class is designed to only be called from code with a lock on the TaskScheduler
- * (e.g. its event handlers). It should not be called from other threads.
- */
-private[spark] trait TaskSetManager extends Schedulable {
-  def schedulableQueue = null
-  
-  def schedulingMode = SchedulingMode.NONE
-  
-  def taskSet: TaskSet
-
-  def resourceOffer(
-      execId: String,
-      host: String,
-      availableCpus: Int,
-      maxLocality: TaskLocality.TaskLocality)
-    : Option[TaskDescription]
-
-  def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)
-
-  def error(message: String)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
deleted file mode 100644
index 1d09bd9..0000000
--- a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler.cluster
-
-/**
- * Represents free resources available on an executor.
- */
-private[spark]
-class WorkerOffer(val executorId: String, val host: String, val cores: Int)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
deleted file mode 100644
index 5be4dbd..0000000
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler.local
-
-import java.io.File
-import java.lang.management.ManagementFactory
-import java.util.concurrent.atomic.AtomicInteger
-import java.nio.ByteBuffer
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-
-import spark._
-import spark.TaskState.TaskState
-import spark.executor.ExecutorURLClassLoader
-import spark.scheduler._
-import spark.scheduler.cluster._
-import spark.scheduler.cluster.SchedulingMode.SchedulingMode
-import akka.actor._
-
-/**
- * A FIFO or Fair TaskScheduler implementation that runs tasks locally in a thread pool. Optionally
- * the scheduler also allows each task to fail up to maxFailures times, which is useful for
- * testing fault recovery.
- */
-
-private[spark]
-case class LocalReviveOffers()
-
-private[spark]
-case class LocalStatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)
-
-private[spark]
-class LocalActor(localScheduler: LocalScheduler, var freeCores: Int) extends Actor with Logging {
-
-  def receive = {
-    case LocalReviveOffers =>
-      launchTask(localScheduler.resourceOffer(freeCores))
-    case LocalStatusUpdate(taskId, state, serializeData) =>
-      freeCores += 1
-      localScheduler.statusUpdate(taskId, state, serializeData)
-      launchTask(localScheduler.resourceOffer(freeCores))
-  }
-
-  def launchTask(tasks : Seq[TaskDescription]) {
-    for (task <- tasks) {
-      freeCores -= 1
-      localScheduler.threadPool.submit(new Runnable {
-        def run() {
-          localScheduler.runTask(task.taskId, task.serializedTask)
-        }
-      })
-    }
-  }
-}
-
-private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: SparkContext)
-  extends TaskScheduler
-  with Logging {
-
-  var attemptId = new AtomicInteger(0)
-  var threadPool = Utils.newDaemonFixedThreadPool(threads)
-  val env = SparkEnv.get
-  var listener: TaskSchedulerListener = null
-
-  // Application dependencies (added through SparkContext) that we've fetched so far on this node.
-  // Each map holds the master's timestamp for the version of that file or JAR we got.
-  val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
-  val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
-
-  val classLoader = new ExecutorURLClassLoader(Array(), Thread.currentThread.getContextClassLoader)
-
-  var schedulableBuilder: SchedulableBuilder = null
-  var rootPool: Pool = null
-  val schedulingMode: SchedulingMode = SchedulingMode.withName(
-    System.getProperty("spark.cluster.schedulingmode", "FIFO"))
-  val activeTaskSets = new HashMap[String, TaskSetManager]
-  val taskIdToTaskSetId = new HashMap[Long, String]
-  val taskSetTaskIds = new HashMap[String, HashSet[Long]]
-
-  var localActor: ActorRef = null
-
-  override def start() {
-    // temporarily set rootPool name to empty
-    rootPool = new Pool("", schedulingMode, 0, 0)
-    schedulableBuilder = {
-      schedulingMode match {
-        case SchedulingMode.FIFO =>
-          new FIFOSchedulableBuilder(rootPool)
-        case SchedulingMode.FAIR =>
-          new FairSchedulableBuilder(rootPool)
-      }
-    }
-    schedulableBuilder.buildPools()
-
-    localActor = env.actorSystem.actorOf(Props(new LocalActor(this, threads)), "Test")
-  }
-
-  override def setListener(listener: TaskSchedulerListener) {
-    this.listener = listener
-  }
-
-  override def submitTasks(taskSet: TaskSet) {
-    synchronized {
-      val manager = new LocalTaskSetManager(this, taskSet)
-      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
-      activeTaskSets(taskSet.id) = manager
-      taskSetTaskIds(taskSet.id) = new HashSet[Long]()
-      localActor ! LocalReviveOffers
-    }
-  }
-
-  def resourceOffer(freeCores: Int): Seq[TaskDescription] = {
-    synchronized {
-      var freeCpuCores = freeCores
-      val tasks = new ArrayBuffer[TaskDescription](freeCores)
-      val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
-      for (manager <- sortedTaskSetQueue) {
-        logDebug("parentName:%s,name:%s,runningTasks:%s".format(
-          manager.parent.name, manager.name, manager.runningTasks))
-      }
-
-      var launchTask = false
-      for (manager <- sortedTaskSetQueue) {
-        do {
-          launchTask = false
-          manager.resourceOffer(null, null, freeCpuCores, null) match {
-            case Some(task) =>
-              tasks += task
-              taskIdToTaskSetId(task.taskId) = manager.taskSet.id
-              taskSetTaskIds(manager.taskSet.id) += task.taskId
-              freeCpuCores -= 1
-              launchTask = true
-            case None => {}
-          }
-        } while(launchTask)
-      }
-      return tasks
-    }
-  }
-
-  def taskSetFinished(manager: TaskSetManager) {
-    synchronized {
-      activeTaskSets -= manager.taskSet.id
-      manager.parent.removeSchedulable(manager)
-      logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id, manager.parent.name))
-      taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
-      taskSetTaskIds -= manager.taskSet.id
-    }
-  }
-
-  def runTask(taskId: Long, bytes: ByteBuffer) {
-    logInfo("Running " + taskId)
-    val info = new TaskInfo(taskId, 0, System.currentTimeMillis(), "local", "local:1", TaskLocality.NODE_LOCAL)
-    // Set the Spark execution environment for the worker thread
-    SparkEnv.set(env)
-    val ser = SparkEnv.get.closureSerializer.newInstance()
-    val objectSer = SparkEnv.get.serializer.newInstance()
-    var attemptedTask: Option[Task[_]] = None   
-    val start = System.currentTimeMillis()
-    var taskStart: Long = 0
-    def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum
-    val startGCTime = getTotalGCTime
-
-    try {
-      Accumulators.clear()
-      Thread.currentThread().setContextClassLoader(classLoader)
-
-      // Serialize and deserialize the task so that accumulators are changed to thread-local ones;
-      // this adds a bit of unnecessary overhead but matches how the Mesos Executor works.
-      val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(bytes)
-      updateDependencies(taskFiles, taskJars)   // Download any files added with addFile
-      val deserializedTask = ser.deserialize[Task[_]](
-        taskBytes, Thread.currentThread.getContextClassLoader)
-      attemptedTask = Some(deserializedTask)
-      val deserTime = System.currentTimeMillis() - start
-      taskStart = System.currentTimeMillis()
-
-      // Run it
-      val result: Any = deserializedTask.run(taskId)
-
-      // Serialize and deserialize the result to emulate what the Mesos
-      // executor does. This is useful to catch serialization errors early
-      // on in development (so when users move their local Spark programs
-      // to the cluster, they don't get surprised by serialization errors).
-      val serResult = objectSer.serialize(result)
-      deserializedTask.metrics.get.resultSize = serResult.limit()
-      val resultToReturn = objectSer.deserialize[Any](serResult)
-      val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
-        ser.serialize(Accumulators.values))
-      val serviceTime = System.currentTimeMillis() - taskStart
-      logInfo("Finished " + taskId)
-      deserializedTask.metrics.get.executorRunTime = serviceTime.toInt
-      deserializedTask.metrics.get.jvmGCTime = getTotalGCTime - startGCTime
-      deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt
-      val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null))
-      val serializedResult = ser.serialize(taskResult)
-      localActor ! LocalStatusUpdate(taskId, TaskState.FINISHED, serializedResult)
-    } catch {
-      case t: Throwable => {
-        val serviceTime = System.currentTimeMillis() - taskStart
-        val metrics = attemptedTask.flatMap(t => t.metrics)
-        for (m <- metrics) {
-          m.executorRunTime = serviceTime.toInt
-          m.jvmGCTime = getTotalGCTime - startGCTime
-        }
-        val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
-        localActor ! LocalStatusUpdate(taskId, TaskState.FAILED, ser.serialize(failure))
-      }
-    }
-  }
-
-  /**
-   * Download any missing dependencies if we receive a new set of files and JARs from the
-   * SparkContext. Also adds any new JARs we fetched to the class loader.
-   */
-  private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
-    synchronized {
-      // Fetch missing dependencies
-      for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
-        logInfo("Fetching " + name + " with timestamp " + timestamp)
-        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
-        currentFiles(name) = timestamp
-      }
-
-      for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
-        logInfo("Fetching " + name + " with timestamp " + timestamp)
-        Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
-        currentJars(name) = timestamp
-        // Add it to our class loader
-        val localName = name.split("/").last
-        val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL
-        if (!classLoader.getURLs.contains(url)) {
-          logInfo("Adding " + url + " to class loader")
-          classLoader.addURL(url)
-        }
-      }
-    }
-  }
-
-  def statusUpdate(taskId :Long, state: TaskState, serializedData: ByteBuffer) {
-    synchronized {
-      val taskSetId = taskIdToTaskSetId(taskId)
-      val taskSetManager = activeTaskSets(taskSetId)
-      taskSetTaskIds(taskSetId) -= taskId
-      taskSetManager.statusUpdate(taskId, state, serializedData)
-    }
-  }
-
-   override def stop() {
-    threadPool.shutdownNow()
-  }
-
-  override def defaultParallelism() = threads
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
deleted file mode 100644
index e237f28..0000000
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler.local
-
-import java.nio.ByteBuffer
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-
-import spark.{ExceptionFailure, Logging, SparkEnv, Success, TaskState}
-import spark.TaskState.TaskState
-import spark.scheduler.{Task, TaskResult, TaskSet}
-import spark.scheduler.cluster.{Schedulable, TaskDescription, TaskInfo, TaskLocality, TaskSetManager}
-
-
-private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet)
-  extends TaskSetManager with Logging {
-
-  var parent: Schedulable = null
-  var weight: Int = 1
-  var minShare: Int = 0
-  var runningTasks: Int = 0
-  var priority: Int = taskSet.priority
-  var stageId: Int = taskSet.stageId
-  var name: String = "TaskSet_" + taskSet.stageId.toString
-
-  var failCount = new Array[Int](taskSet.tasks.size)
-  val taskInfos = new HashMap[Long, TaskInfo]
-  val numTasks = taskSet.tasks.size
-  var numFinished = 0
-  val env = SparkEnv.get
-  val ser = env.closureSerializer.newInstance()
-  val copiesRunning = new Array[Int](numTasks)
-  val finished = new Array[Boolean](numTasks)
-  val numFailures = new Array[Int](numTasks)
-  val MAX_TASK_FAILURES = sched.maxFailures
-
-  override def increaseRunningTasks(taskNum: Int): Unit = {
-    runningTasks += taskNum
-    if (parent != null) {
-     parent.increaseRunningTasks(taskNum)
-    }
-  }
-
-  override def decreaseRunningTasks(taskNum: Int): Unit = {
-    runningTasks -= taskNum
-    if (parent != null) {
-      parent.decreaseRunningTasks(taskNum)
-    }
-  }
-
-  override def addSchedulable(schedulable: Schedulable): Unit = {
-    // nothing
-  }
-
-  override def removeSchedulable(schedulable: Schedulable): Unit = {
-    // nothing
-  }
-
-  override def getSchedulableByName(name: String): Schedulable = {
-    return null
-  }
-
-  override def executorLost(executorId: String, host: String): Unit = {
-    // nothing
-  }
-
-  override def checkSpeculatableTasks() = true
-
-  override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
-    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
-    sortedTaskSetQueue += this
-    return sortedTaskSetQueue
-  }
-
-  override def hasPendingTasks() = true
-
-  def findTask(): Option[Int] = {
-    for (i <- 0 to numTasks-1) {
-      if (copiesRunning(i) == 0 && !finished(i)) {
-        return Some(i)
-      }
-    }
-    return None
-  }
-
-  override def resourceOffer(
-      execId: String,
-      host: String,
-      availableCpus: Int,
-      maxLocality: TaskLocality.TaskLocality)
-    : Option[TaskDescription] =
-  {
-    SparkEnv.set(sched.env)
-    logDebug("availableCpus:%d, numFinished:%d, numTasks:%d".format(
-      availableCpus.toInt, numFinished, numTasks))
-    if (availableCpus > 0 && numFinished < numTasks) {
-      findTask() match {
-        case Some(index) =>
-          val taskId = sched.attemptId.getAndIncrement()
-          val task = taskSet.tasks(index)
-          val info = new TaskInfo(taskId, index, System.currentTimeMillis(), "local", "local:1",
-            TaskLocality.NODE_LOCAL)
-          taskInfos(taskId) = info
-          // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
-          // we assume the task can be serialized without exceptions.
-          val bytes = Task.serializeWithDependencies(
-            task, sched.sc.addedFiles, sched.sc.addedJars, ser)
-          logInfo("Size of task " + taskId + " is " + bytes.limit + " bytes")
-          val taskName = "task %s:%d".format(taskSet.id, index)
-          copiesRunning(index) += 1
-          increaseRunningTasks(1)
-          taskStarted(task, info)
-          return Some(new TaskDescription(taskId, null, taskName, index, bytes))
-        case None => {}
-      }
-    }
-    return None
-  }
-
-  override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
-    SparkEnv.set(env)
-    state match {
-      case TaskState.FINISHED =>
-        taskEnded(tid, state, serializedData)
-      case TaskState.FAILED =>
-        taskFailed(tid, state, serializedData)
-      case _ => {}
-    }
-  }
-
-  def taskStarted(task: Task[_], info: TaskInfo) {
-    sched.listener.taskStarted(task, info)
-  }
-
-  def taskEnded(tid: Long, state: TaskState, serializedData: ByteBuffer) {
-    val info = taskInfos(tid)
-    val index = info.index
-    val task = taskSet.tasks(index)
-    info.markSuccessful()
-    val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader)
-    result.metrics.resultSize = serializedData.limit()
-    sched.listener.taskEnded(task, Success, result.value, result.accumUpdates, info, result.metrics)
-    numFinished += 1
-    decreaseRunningTasks(1)
-    finished(index) = true
-    if (numFinished == numTasks) {
-      sched.taskSetFinished(this)
-    }
-  }
-
-  def taskFailed(tid: Long, state: TaskState, serializedData: ByteBuffer) {
-    val info = taskInfos(tid)
-    val index = info.index
-    val task = taskSet.tasks(index)
-    info.markFailed()
-    decreaseRunningTasks(1)
-    val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](
-      serializedData, getClass.getClassLoader)
-    sched.listener.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null))
-    if (!finished(index)) {
-      copiesRunning(index) -= 1
-      numFailures(index) += 1
-      val locs = reason.stackTrace.map(loc => "\tat %s".format(loc.toString))
-      logInfo("Loss was due to %s\n%s\n%s".format(
-        reason.className, reason.description, locs.mkString("\n")))
-      if (numFailures(index) > MAX_TASK_FAILURES) {
-        val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format(
-          taskSet.id, index, 4, reason.description)
-        decreaseRunningTasks(runningTasks)
-        sched.listener.taskSetFailed(taskSet, errorMessage)
-        // need to delete failed Taskset from schedule queue
-        sched.taskSetFinished(this)
-      }
-    }
-  }
-
-  override def error(message: String) {
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
deleted file mode 100644
index eef3ee1..0000000
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler.mesos
-
-import com.google.protobuf.ByteString
-
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-
-import spark.{SparkException, Utils, Logging, SparkContext}
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.collection.JavaConversions._
-import java.io.File
-import spark.scheduler.cluster._
-import java.util.{ArrayList => JArrayList, List => JList}
-import java.util.Collections
-import spark.TaskState
-
-/**
- * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
- * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
- * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
- * StandaloneBackend mechanism. This class is useful for lower and more predictable latency.
- *
- * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
- * remove this.
- */
-private[spark] class CoarseMesosSchedulerBackend(
-    scheduler: ClusterScheduler,
-    sc: SparkContext,
-    master: String,
-    appName: String)
-  extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
-  with MScheduler
-  with Logging {
-
-  val MAX_SLAVE_FAILURES = 2     // Blacklist a slave after this many failures
-
-  // Lock used to wait for scheduler to be registered
-  var isRegistered = false
-  val registeredLock = new Object()
-
-  // Driver for talking to Mesos
-  var driver: SchedulerDriver = null
-
-  // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
-  val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
-
-  // Cores we have acquired with each Mesos task ID
-  val coresByTaskId = new HashMap[Int, Int]
-  var totalCoresAcquired = 0
-
-  val slaveIdsWithExecutors = new HashSet[String]
-
-  val taskIdToSlaveId = new HashMap[Int, String]
-  val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed
-
-  val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
-    "Spark home is not set; set it through the spark.home system " +
-    "property, the SPARK_HOME environment variable or the SparkContext constructor"))
-
-  val extraCoresPerSlave = System.getProperty("spark.mesos.extra.cores", "0").toInt
-
-  var nextMesosTaskId = 0
-
-  def newMesosTaskId(): Int = {
-    val id = nextMesosTaskId
-    nextMesosTaskId += 1
-    id
-  }
-
-  override def start() {
-    super.start()
-
-    synchronized {
-      new Thread("CoarseMesosSchedulerBackend driver") {
-        setDaemon(true)
-        override def run() {
-          val scheduler = CoarseMesosSchedulerBackend.this
-          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
-          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
-          try { {
-            val ret = driver.run()
-            logInfo("driver.run() returned with code " + ret)
-          }
-          } catch {
-            case e: Exception => logError("driver.run() failed", e)
-          }
-        }
-      }.start()
-
-      waitForRegister()
-    }
-  }
-
-  def createCommand(offer: Offer, numCores: Int): CommandInfo = {
-    val environment = Environment.newBuilder()
-    sc.executorEnvs.foreach { case (key, value) =>
-      environment.addVariables(Environment.Variable.newBuilder()
-        .setName(key)
-        .setValue(value)
-        .build())
-    }
-    val command = CommandInfo.newBuilder()
-      .setEnvironment(environment)
-    val driverUrl = "akka://spark@%s:%s/user/%s".format(
-      System.getProperty("spark.driver.host"),
-      System.getProperty("spark.driver.port"),
-      StandaloneSchedulerBackend.ACTOR_NAME)
-    val uri = System.getProperty("spark.executor.uri")
-    if (uri == null) {
-      val runScript = new File(sparkHome, "spark-class").getCanonicalPath
-      command.setValue("\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
-        runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
-    } else {
-      // Grab everything to the first '.'. We'll use that and '*' to
-      // glob the directory "correctly".
-      val basename = uri.split('/').last.split('.').head
-      command.setValue("cd %s*; ./spark-class spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
-        basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
-      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
-    }
-    return command.build()
-  }
-
-  override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
-
-  override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
-    logInfo("Registered as framework ID " + frameworkId.getValue)
-    registeredLock.synchronized {
-      isRegistered = true
-      registeredLock.notifyAll()
-    }
-  }
-
-  def waitForRegister() {
-    registeredLock.synchronized {
-      while (!isRegistered) {
-        registeredLock.wait()
-      }
-    }
-  }
-
-  override def disconnected(d: SchedulerDriver) {}
-
-  override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
-
-  /**
-   * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
-   * unless we've already launched more than we wanted to.
-   */
-  override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
-    synchronized {
-      val filters = Filters.newBuilder().setRefuseSeconds(-1).build()
-
-      for (offer <- offers) {
-        val slaveId = offer.getSlaveId.toString
-        val mem = getResource(offer.getResourcesList, "mem")
-        val cpus = getResource(offer.getResourcesList, "cpus").toInt
-        if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 &&
-            failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
-            !slaveIdsWithExecutors.contains(slaveId)) {
-          // Launch an executor on the slave
-          val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
-          val taskId = newMesosTaskId()
-          taskIdToSlaveId(taskId) = slaveId
-          slaveIdsWithExecutors += slaveId
-          coresByTaskId(taskId) = cpusToUse
-          val task = MesosTaskInfo.newBuilder()
-            .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
-            .setSlaveId(offer.getSlaveId)
-            .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
-            .setName("Task " + taskId)
-            .addResources(createResource("cpus", cpusToUse))
-            .addResources(createResource("mem", executorMemory))
-            .build()
-          d.launchTasks(offer.getId, Collections.singletonList(task), filters)
-        } else {
-          // Filter it out
-          d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters)
-        }
-      }
-    }
-  }
-
-  /** Helper function to pull out a resource from a Mesos Resources protobuf */
-  private def getResource(res: JList[Resource], name: String): Double = {
-    for (r <- res if r.getName == name) {
-      return r.getScalar.getValue
-    }
-    // If we reached here, no resource with the required name was present
-    throw new IllegalArgumentException("No resource called " + name + " in " + res)
-  }
-
-  /** Build a Mesos resource protobuf object */
-  private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
-    Resource.newBuilder()
-      .setName(resourceName)
-      .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
-      .build()
-  }
-
-  /** Check whether a Mesos task state represents a finished task */
-  private def isFinished(state: MesosTaskState) = {
-    state == MesosTaskState.TASK_FINISHED ||
-      state == MesosTaskState.TASK_FAILED ||
-      state == MesosTaskState.TASK_KILLED ||
-      state == MesosTaskState.TASK_LOST
-  }
-
-  override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
-    val taskId = status.getTaskId.getValue.toInt
-    val state = status.getState
-    logInfo("Mesos task " + taskId + " is now " + state)
-    synchronized {
-      if (isFinished(state)) {
-        val slaveId = taskIdToSlaveId(taskId)
-        slaveIdsWithExecutors -= slaveId
-        taskIdToSlaveId -= taskId
-        // Remove the cores we have remembered for this task, if it's in the hashmap
-        for (cores <- coresByTaskId.get(taskId)) {
-          totalCoresAcquired -= cores
-          coresByTaskId -= taskId
-        }
-        // If it was a failure, mark the slave as failed for blacklisting purposes
-        if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) {
-          failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
-          if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
-            logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
-                "is Spark installed on it?")
-          }
-        }
-        driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
-      }
-    }
-  }
-
-  override def error(d: SchedulerDriver, message: String) {
-    logError("Mesos error: " + message)
-    scheduler.error(message)
-  }
-
-  override def stop() {
-    super.stop()
-    if (driver != null) {
-      driver.stop()
-    }
-  }
-
-  override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
-
-  override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
-    logInfo("Mesos slave lost: " + slaveId.getValue)
-    synchronized {
-      if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
-        // Note that the slave ID corresponds to the executor ID on that slave
-        slaveIdsWithExecutors -= slaveId.getValue
-        removeExecutor(slaveId.getValue, "Mesos slave lost")
-      }
-    }
-  }
-
-  override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
-    logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
-    slaveLost(d, s)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
deleted file mode 100644
index f6069a5..0000000
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler.mesos
-
-import com.google.protobuf.ByteString
-
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-
-import spark.{SparkException, Utils, Logging, SparkContext}
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.collection.JavaConversions._
-import java.io.File
-import spark.scheduler.cluster._
-import java.util.{ArrayList => JArrayList, List => JList}
-import java.util.Collections
-import spark.TaskState
-
-/**
- * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
- * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
- * from multiple apps can run on different cores) and in time (a core can switch ownership).
- */
-private[spark] class MesosSchedulerBackend(
-    scheduler: ClusterScheduler,
-    sc: SparkContext,
-    master: String,
-    appName: String)
-  extends SchedulerBackend
-  with MScheduler
-  with Logging {
-
-  // Lock used to wait for scheduler to be registered
-  var isRegistered = false
-  val registeredLock = new Object()
-
-  // Driver for talking to Mesos
-  var driver: SchedulerDriver = null
-
-  // Which slave IDs we have executors on
-  val slaveIdsWithExecutors = new HashSet[String]
-  val taskIdToSlaveId = new HashMap[Long, String]
-
-  // An ExecutorInfo for our tasks
-  var execArgs: Array[Byte] = null
-
-  var classLoader: ClassLoader = null
-
-  override def start() {
-    synchronized {
-      classLoader = Thread.currentThread.getContextClassLoader
-
-      new Thread("MesosSchedulerBackend driver") {
-        setDaemon(true)
-        override def run() {
-          val scheduler = MesosSchedulerBackend.this
-          val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
-          driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
-          try {
-            val ret = driver.run()
-            logInfo("driver.run() returned with code " + ret)
-          } catch {
-            case e: Exception => logError("driver.run() failed", e)
-          }
-        }
-      }.start()
-
-      waitForRegister()
-    }
-  }
-
-  def createExecutorInfo(execId: String): ExecutorInfo = {
-    val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
-      "Spark home is not set; set it through the spark.home system " +
-      "property, the SPARK_HOME environment variable or the SparkContext constructor"))
-    val environment = Environment.newBuilder()
-    sc.executorEnvs.foreach { case (key, value) =>
-      environment.addVariables(Environment.Variable.newBuilder()
-        .setName(key)
-        .setValue(value)
-        .build())
-    }
-    val command = CommandInfo.newBuilder()
-      .setEnvironment(environment)
-    val uri = System.getProperty("spark.executor.uri")
-    if (uri == null) {
-      command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
-    } else {
-      // Grab everything to the first '.'. We'll use that and '*' to
-      // glob the directory "correctly".
-      val basename = uri.split('/').last.split('.').head
-      command.setValue("cd %s*; ./spark-executor".format(basename))
-      command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
-    }
-    val memory = Resource.newBuilder()
-      .setName("mem")
-      .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
-      .build()
-    ExecutorInfo.newBuilder()
-      .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
-      .setCommand(command)
-      .setData(ByteString.copyFrom(createExecArg()))
-      .addResources(memory)
-      .build()
-  }
-
-  /**
-   * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array
-   * containing all the spark.* system properties in the form of (String, String) pairs.
-   */
-  private def createExecArg(): Array[Byte] = {
-    if (execArgs == null) {
-      val props = new HashMap[String, String]
-      val iterator = System.getProperties.entrySet.iterator
-      while (iterator.hasNext) {
-        val entry = iterator.next
-        val (key, value) = (entry.getKey.toString, entry.getValue.toString)
-        if (key.startsWith("spark.")) {
-          props(key) = value
-        }
-      }
-      // Serialize the map as an array of (String, String) pairs
-      execArgs = Utils.serialize(props.toArray)
-    }
-    return execArgs
-  }
-
-  private def setClassLoader(): ClassLoader = {
-    val oldClassLoader = Thread.currentThread.getContextClassLoader
-    Thread.currentThread.setContextClassLoader(classLoader)
-    return oldClassLoader
-  }
-
-  private def restoreClassLoader(oldClassLoader: ClassLoader) {
-    Thread.currentThread.setContextClassLoader(oldClassLoader)
-  }
-
-  override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
-
-  override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
-    val oldClassLoader = setClassLoader()
-    try {
-      logInfo("Registered as framework ID " + frameworkId.getValue)
-      registeredLock.synchronized {
-        isRegistered = true
-        registeredLock.notifyAll()
-      }
-    } finally {
-      restoreClassLoader(oldClassLoader)
-    }
-  }
-
-  def waitForRegister() {
-    registeredLock.synchronized {
-      while (!isRegistered) {
-        registeredLock.wait()
-      }
-    }
-  }
-
-  override def disconnected(d: SchedulerDriver) {}
-
-  override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
-
-  /**
-   * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets
-   * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
-   * tasks are balanced across the cluster.
-   */
-  override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
-    val oldClassLoader = setClassLoader()
-    try {
-      synchronized {
-        // Build a big list of the offerable workers, and remember their indices so that we can
-        // figure out which Offer to reply to for each worker
-        val offerableIndices = new ArrayBuffer[Int]
-        val offerableWorkers = new ArrayBuffer[WorkerOffer]
-
-        def enoughMemory(o: Offer) = {
-          val mem = getResource(o.getResourcesList, "mem")
-          val slaveId = o.getSlaveId.getValue
-          mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
-        }
-
-        for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
-          offerableIndices += index
-          offerableWorkers += new WorkerOffer(
-            offer.getSlaveId.getValue,
-            offer.getHostname,
-            getResource(offer.getResourcesList, "cpus").toInt)
-        }
-
-        // Call into the ClusterScheduler
-        val taskLists = scheduler.resourceOffers(offerableWorkers)
-
-        // Build a list of Mesos tasks for each slave
-        val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
-        for ((taskList, index) <- taskLists.zipWithIndex) {
-          if (!taskList.isEmpty) {
-            val offerNum = offerableIndices(index)
-            val slaveId = offers(offerNum).getSlaveId.getValue
-            slaveIdsWithExecutors += slaveId
-            mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
-            for (taskDesc <- taskList) {
-              taskIdToSlaveId(taskDesc.taskId) = slaveId
-              mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
-            }
-          }
-        }
-
-        // Reply to the offers
-        val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
-        for (i <- 0 until offers.size) {
-          d.launchTasks(offers(i).getId, mesosTasks(i), filters)
-        }
-      }
-    } finally {
-      restoreClassLoader(oldClassLoader)
-    }
-  }
-
-  /** Helper function to pull out a resource from a Mesos Resources protobuf */
-  def getResource(res: JList[Resource], name: String): Double = {
-    for (r <- res if r.getName == name) {
-      return r.getScalar.getValue
-    }
-    // If we reached here, no resource with the required name was present
-    throw new IllegalArgumentException("No resource called " + name + " in " + res)
-  }
-
-  /** Turn a Spark TaskDescription into a Mesos task */
-  def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
-    val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
-    val cpuResource = Resource.newBuilder()
-      .setName("cpus")
-      .setType(Value.Type.SCALAR)
-      .setScalar(Value.Scalar.newBuilder().setValue(1).build())
-      .build()
-    return MesosTaskInfo.newBuilder()
-      .setTaskId(taskId)
-      .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
-      .setExecutor(createExecutorInfo(slaveId))
-      .setName(task.name)
-      .addResources(cpuResource)
-      .setData(ByteString.copyFrom(task.serializedTask))
-      .build()
-  }
-
-  /** Check whether a Mesos task state represents a finished task */
-  def isFinished(state: MesosTaskState) = {
-    state == MesosTaskState.TASK_FINISHED ||
-      state == MesosTaskState.TASK_FAILED ||
-      state == MesosTaskState.TASK_KILLED ||
-      state == MesosTaskState.TASK_LOST
-  }
-
-  override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
-    val oldClassLoader = setClassLoader()
-    try {
-      val tid = status.getTaskId.getValue.toLong
-      val state = TaskState.fromMesos(status.getState)
-      synchronized {
-        if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
-          // We lost the executor on this slave, so remember that it's gone
-          slaveIdsWithExecutors -= taskIdToSlaveId(tid)
-        }
-        if (isFinished(status.getState)) {
-          taskIdToSlaveId.remove(tid)
-        }
-      }
-      scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
-    } finally {
-      restoreClassLoader(oldClassLoader)
-    }
-  }
-
-  override def error(d: SchedulerDriver, message: String) {
-    val oldClassLoader = setClassLoader()
-    try {
-      logError("Mesos error: " + message)
-      scheduler.error(message)
-    } finally {
-      restoreClassLoader(oldClassLoader)
-    }
-  }
-
-  override def stop() {
-    if (driver != null) {
-      driver.stop()
-    }
-  }
-
-  override def reviveOffers() {
-    driver.reviveOffers()
-  }
-
-  override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
-
-  private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
-    val oldClassLoader = setClassLoader()
-    try {
-      logInfo("Mesos slave lost: " + slaveId.getValue)
-      synchronized {
-        slaveIdsWithExecutors -= slaveId.getValue
-      }
-      scheduler.executorLost(slaveId.getValue, reason)
-    } finally {
-      restoreClassLoader(oldClassLoader)
-    }
-  }
-
-  override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
-    recordSlaveLost(d, slaveId, SlaveLost())
-  }
-
-  override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
-                            slaveId: SlaveID, status: Int) {
-    logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
-                                                                 slaveId.getValue))
-    recordSlaveLost(d, slaveId, ExecutorExited(status))
-  }
-
-  // TODO: query Mesos for number of cores
-  override def defaultParallelism() = System.getProperty("spark.default.parallelism", "8").toInt
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/serializer/Serializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/serializer/Serializer.scala b/core/src/main/scala/spark/serializer/Serializer.scala
deleted file mode 100644
index dc94d42..0000000
--- a/core/src/main/scala/spark/serializer/Serializer.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.serializer
-
-import java.io.{EOFException, InputStream, OutputStream}
-import java.nio.ByteBuffer
-
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
-import spark.util.ByteBufferInputStream
-
-
-/**
- * A serializer. Because some serialization libraries are not thread safe, this class is used to
- * create [[spark.serializer.SerializerInstance]] objects that do the actual serialization and are
- * guaranteed to only be called from one thread at a time.
- */
-trait Serializer {
-  def newInstance(): SerializerInstance
-}
-
-
-/**
- * An instance of a serializer, for use by one thread at a time.
- */
-trait SerializerInstance {
-  def serialize[T](t: T): ByteBuffer
-
-  def deserialize[T](bytes: ByteBuffer): T
-
-  def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T
-
-  def serializeStream(s: OutputStream): SerializationStream
-
-  def deserializeStream(s: InputStream): DeserializationStream
-
-  def serializeMany[T](iterator: Iterator[T]): ByteBuffer = {
-    // Default implementation uses serializeStream
-    val stream = new FastByteArrayOutputStream()
-    serializeStream(stream).writeAll(iterator)
-    val buffer = ByteBuffer.allocate(stream.position.toInt)
-    buffer.put(stream.array, 0, stream.position.toInt)
-    buffer.flip()
-    buffer
-  }
-
-  def deserializeMany(buffer: ByteBuffer): Iterator[Any] = {
-    // Default implementation uses deserializeStream
-    buffer.rewind()
-    deserializeStream(new ByteBufferInputStream(buffer)).asIterator
-  }
-}
-
-
-/**
- * A stream for writing serialized objects.
- */
-trait SerializationStream {
-  def writeObject[T](t: T): SerializationStream
-  def flush(): Unit
-  def close(): Unit
-
-  def writeAll[T](iter: Iterator[T]): SerializationStream = {
-    while (iter.hasNext) {
-      writeObject(iter.next())
-    }
-    this
-  }
-}
-
-
-/**
- * A stream for reading serialized objects.
- */
-trait DeserializationStream {
-  def readObject[T](): T
-  def close(): Unit
-
-  /**
-   * Read the elements of this stream through an iterator. This can only be called once, as
-   * reading each element will consume data from the input source.
-   */
-  def asIterator: Iterator[Any] = new spark.util.NextIterator[Any] {
-    override protected def getNext() = {
-      try {
-        readObject[Any]()
-      } catch {
-        case eof: EOFException =>
-          finished = true
-      }
-    }
-
-    override protected def close() {
-      DeserializationStream.this.close()
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/serializer/SerializerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/serializer/SerializerManager.scala b/core/src/main/scala/spark/serializer/SerializerManager.scala
deleted file mode 100644
index b7b2470..0000000
--- a/core/src/main/scala/spark/serializer/SerializerManager.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.serializer
-
-import java.util.concurrent.ConcurrentHashMap
-
-
-/**
- * A service that returns a serializer object given the serializer's class name. If a previous
- * instance of the serializer object has been created, the get method returns that instead of
- * creating a new one.
- */
-private[spark] class SerializerManager {
-
-  private val serializers = new ConcurrentHashMap[String, Serializer]
-  private var _default: Serializer = _
-
-  def default = _default
-
-  def setDefault(clsName: String): Serializer = {
-    _default = get(clsName)
-    _default
-  }
-
-  def get(clsName: String): Serializer = {
-    if (clsName == null) {
-      default
-    } else {
-      var serializer = serializers.get(clsName)
-      if (serializer != null) {
-        // If the serializer has been created previously, reuse that.
-        serializer
-      } else this.synchronized {
-        // Otherwise, create a new one. But make sure no other thread has attempted
-        // to create another new one at the same time.
-        serializer = serializers.get(clsName)
-        if (serializer == null) {
-          val clsLoader = Thread.currentThread.getContextClassLoader
-          serializer =
-            Class.forName(clsName, true, clsLoader).newInstance().asInstanceOf[Serializer]
-          serializers.put(clsName, serializer)
-        }
-        serializer
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockException.scala b/core/src/main/scala/spark/storage/BlockException.scala
deleted file mode 100644
index 8ebfaf3..0000000
--- a/core/src/main/scala/spark/storage/BlockException.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-private[spark]
-case class BlockException(blockId: String, message: String) extends Exception(message)
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockFetchTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockFetchTracker.scala b/core/src/main/scala/spark/storage/BlockFetchTracker.scala
deleted file mode 100644
index 265e554..0000000
--- a/core/src/main/scala/spark/storage/BlockFetchTracker.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-private[spark] trait BlockFetchTracker {
-  def totalBlocks : Int
-  def numLocalBlocks: Int
-  def numRemoteBlocks: Int
-  def remoteFetchTime : Long
-  def fetchWaitTime: Long
-  def remoteBytesRead : Long
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
deleted file mode 100644
index 568783d..0000000
--- a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-import java.nio.ByteBuffer
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashSet
-import scala.collection.mutable.Queue
-
-import io.netty.buffer.ByteBuf
-
-import spark.Logging
-import spark.Utils
-import spark.SparkException
-import spark.network.BufferMessage
-import spark.network.ConnectionManagerId
-import spark.network.netty.ShuffleCopier
-import spark.serializer.Serializer
-
-
-/**
- * A block fetcher iterator interface. There are two implementations:
- *
- * BasicBlockFetcherIterator: uses a custom-built NIO communication layer.
- * NettyBlockFetcherIterator: uses Netty (OIO) as the communication layer.
- *
- * Eventually we would like the two to converge and use a single NIO-based communication layer,
- * but extensive tests show that under some circumstances (e.g. large shuffles with lots of cores),
- * NIO would perform poorly and thus the need for the Netty OIO one.
- */
-
-private[storage]
-trait BlockFetcherIterator extends Iterator[(String, Option[Iterator[Any]])]
-  with Logging with BlockFetchTracker {
-  def initialize()
-}
-
-
-private[storage]
-object BlockFetcherIterator {
-
-  // A request to fetch one or more blocks, complete with their sizes
-  class FetchRequest(val address: BlockManagerId, val blocks: Seq[(String, Long)]) {
-    val size = blocks.map(_._2).sum
-  }
-
-  // A result of a fetch. Includes the block ID, size in bytes, and a function to deserialize
-  // the block (since we want all deserializaton to happen in the calling thread); can also
-  // represent a fetch failure if size == -1.
-  class FetchResult(val blockId: String, val size: Long, val deserialize: () => Iterator[Any]) {
-    def failed: Boolean = size == -1
-  }
-
-  class BasicBlockFetcherIterator(
-      private val blockManager: BlockManager,
-      val blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])],
-      serializer: Serializer)
-    extends BlockFetcherIterator {
-
-    import blockManager._
-
-    private var _remoteBytesRead = 0l
-    private var _remoteFetchTime = 0l
-    private var _fetchWaitTime = 0l
-
-    if (blocksByAddress == null) {
-      throw new IllegalArgumentException("BlocksByAddress is null")
-    }
-
-    // Total number blocks fetched (local + remote). Also number of FetchResults expected
-    protected var _numBlocksToFetch = 0
-
-    protected var startTime = System.currentTimeMillis
-
-    // This represents the number of local blocks, also counting zero-sized blocks
-    private var numLocal = 0
-    // BlockIds for local blocks that need to be fetched. Excludes zero-sized blocks
-    protected val localBlocksToFetch = new ArrayBuffer[String]()
-
-    // This represents the number of remote blocks, also counting zero-sized blocks
-    private var numRemote = 0
-    // BlockIds for remote blocks that need to be fetched. Excludes zero-sized blocks
-    protected val remoteBlocksToFetch = new HashSet[String]()
-
-    // A queue to hold our results.
-    protected val results = new LinkedBlockingQueue[FetchResult]
-
-    // Queue of fetch requests to issue; we'll pull requests off this gradually to make sure that
-    // the number of bytes in flight is limited to maxBytesInFlight
-    private val fetchRequests = new Queue[FetchRequest]
-
-    // Current bytes in flight from our requests
-    private var bytesInFlight = 0L
-
-    protected def sendRequest(req: FetchRequest) {
-      logDebug("Sending request for %d blocks (%s) from %s".format(
-        req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
-      val cmId = new ConnectionManagerId(req.address.host, req.address.port)
-      val blockMessageArray = new BlockMessageArray(req.blocks.map {
-        case (blockId, size) => BlockMessage.fromGetBlock(GetBlock(blockId))
-      })
-      bytesInFlight += req.size
-      val sizeMap = req.blocks.toMap  // so we can look up the size of each blockID
-      val fetchStart = System.currentTimeMillis()
-      val future = connectionManager.sendMessageReliably(cmId, blockMessageArray.toBufferMessage)
-      future.onSuccess {
-        case Some(message) => {
-          val fetchDone = System.currentTimeMillis()
-          _remoteFetchTime += fetchDone - fetchStart
-          val bufferMessage = message.asInstanceOf[BufferMessage]
-          val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
-          for (blockMessage <- blockMessageArray) {
-            if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) {
-              throw new SparkException(
-                "Unexpected message " + blockMessage.getType + " received from " + cmId)
-            }
-            val blockId = blockMessage.getId
-            val networkSize = blockMessage.getData.limit()
-            results.put(new FetchResult(blockId, sizeMap(blockId),
-              () => dataDeserialize(blockId, blockMessage.getData, serializer)))
-            _remoteBytesRead += networkSize
-            logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
-          }
-        }
-        case None => {
-          logError("Could not get block(s) from " + cmId)
-          for ((blockId, size) <- req.blocks) {
-            results.put(new FetchResult(blockId, -1, null))
-          }
-        }
-      }
-    }
-
-    protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
-      // Split local and remote blocks. Remote blocks are further split into FetchRequests of size
-      // at most maxBytesInFlight in order to limit the amount of data in flight.
-      val remoteRequests = new ArrayBuffer[FetchRequest]
-      for ((address, blockInfos) <- blocksByAddress) {
-        if (address == blockManagerId) {
-          numLocal = blockInfos.size
-          // Filter out zero-sized blocks
-          localBlocksToFetch ++= blockInfos.filter(_._2 != 0).map(_._1)
-          _numBlocksToFetch += localBlocksToFetch.size
-        } else {
-          numRemote += blockInfos.size
-          // Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them
-          // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
-          // nodes, rather than blocking on reading output from one node.
-          val minRequestSize = math.max(maxBytesInFlight / 5, 1L)
-          logInfo("maxBytesInFlight: " + maxBytesInFlight + ", minRequest: " + minRequestSize)
-          val iterator = blockInfos.iterator
-          var curRequestSize = 0L
-          var curBlocks = new ArrayBuffer[(String, Long)]
-          while (iterator.hasNext) {
-            val (blockId, size) = iterator.next()
-            // Skip empty blocks
-            if (size > 0) {
-              curBlocks += ((blockId, size))
-              remoteBlocksToFetch += blockId
-              _numBlocksToFetch += 1
-              curRequestSize += size
-            } else if (size < 0) {
-              throw new BlockException(blockId, "Negative block size " + size)
-            }
-            if (curRequestSize >= minRequestSize) {
-              // Add this FetchRequest
-              remoteRequests += new FetchRequest(address, curBlocks)
-              curRequestSize = 0
-              curBlocks = new ArrayBuffer[(String, Long)]
-            }
-          }
-          // Add in the final request
-          if (!curBlocks.isEmpty) {
-            remoteRequests += new FetchRequest(address, curBlocks)
-          }
-        }
-      }
-      logInfo("Getting " + _numBlocksToFetch + " non-zero-bytes blocks out of " +
-        totalBlocks + " blocks")
-      remoteRequests
-    }
-
-    protected def getLocalBlocks() {
-      // Get the local blocks while remote blocks are being fetched. Note that it's okay to do
-      // these all at once because they will just memory-map some files, so they won't consume
-      // any memory that might exceed our maxBytesInFlight
-      for (id <- localBlocksToFetch) {
-        getLocalFromDisk(id, serializer) match {
-          case Some(iter) => {
-            // Pass 0 as size since it's not in flight
-            results.put(new FetchResult(id, 0, () => iter))
-            logDebug("Got local block " + id)
-          }
-          case None => {
-            throw new BlockException(id, "Could not get block " + id + " from local machine")
-          }
-        }
-      }
-    }
-
-    override def initialize() {
-      // Split local and remote blocks.
-      val remoteRequests = splitLocalRemoteBlocks()
-      // Add the remote requests into our queue in a random order
-      fetchRequests ++= Utils.randomize(remoteRequests)
-
-      // Send out initial requests for blocks, up to our maxBytesInFlight
-      while (!fetchRequests.isEmpty &&
-        (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
-        sendRequest(fetchRequests.dequeue())
-      }
-
-      val numGets = remoteRequests.size - fetchRequests.size
-      logInfo("Started " + numGets + " remote gets in " + Utils.getUsedTimeMs(startTime))
-
-      // Get Local Blocks
-      startTime = System.currentTimeMillis
-      getLocalBlocks()
-      logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
-    }
-
-    //an iterator that will read fetched blocks off the queue as they arrive.
-    @volatile protected var resultsGotten = 0
-
-    override def hasNext: Boolean = resultsGotten < _numBlocksToFetch
-
-    override def next(): (String, Option[Iterator[Any]]) = {
-      resultsGotten += 1
-      val startFetchWait = System.currentTimeMillis()
-      val result = results.take()
-      val stopFetchWait = System.currentTimeMillis()
-      _fetchWaitTime += (stopFetchWait - startFetchWait)
-      if (! result.failed) bytesInFlight -= result.size
-      while (!fetchRequests.isEmpty &&
-        (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
-        sendRequest(fetchRequests.dequeue())
-      }
-      (result.blockId, if (result.failed) None else Some(result.deserialize()))
-    }
-
-    // Implementing BlockFetchTracker trait.
-    override def totalBlocks: Int = numLocal + numRemote
-    override def numLocalBlocks: Int = numLocal
-    override def numRemoteBlocks: Int = numRemote
-    override def remoteFetchTime: Long = _remoteFetchTime
-    override def fetchWaitTime: Long = _fetchWaitTime
-    override def remoteBytesRead: Long = _remoteBytesRead
-  }
-  // End of BasicBlockFetcherIterator
-
-  class NettyBlockFetcherIterator(
-      blockManager: BlockManager,
-      blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])],
-      serializer: Serializer)
-    extends BasicBlockFetcherIterator(blockManager, blocksByAddress, serializer) {
-
-    import blockManager._
-
-    val fetchRequestsSync = new LinkedBlockingQueue[FetchRequest]
-
-    private def startCopiers(numCopiers: Int): List[_ <: Thread] = {
-      (for ( i <- Range(0,numCopiers) ) yield {
-        val copier = new Thread {
-          override def run(){
-            try {
-              while(!isInterrupted && !fetchRequestsSync.isEmpty) {
-                sendRequest(fetchRequestsSync.take())
-              }
-            } catch {
-              case x: InterruptedException => logInfo("Copier Interrupted")
-              //case _ => throw new SparkException("Exception Throw in Shuffle Copier")
-            }
-          }
-        }
-        copier.start
-        copier
-      }).toList
-    }
-
-    // keep this to interrupt the threads when necessary
-    private def stopCopiers() {
-      for (copier <- copiers) {
-        copier.interrupt()
-      }
-    }
-
-    override protected def sendRequest(req: FetchRequest) {
-
-      def putResult(blockId: String, blockSize: Long, blockData: ByteBuf) {
-        val fetchResult = new FetchResult(blockId, blockSize,
-          () => dataDeserialize(blockId, blockData.nioBuffer, serializer))
-        results.put(fetchResult)
-      }
-
-      logDebug("Sending request for %d blocks (%s) from %s".format(
-        req.blocks.size, Utils.bytesToString(req.size), req.address.host))
-      val cmId = new ConnectionManagerId(req.address.host, req.address.nettyPort)
-      val cpier = new ShuffleCopier
-      cpier.getBlocks(cmId, req.blocks, putResult)
-      logDebug("Sent request for remote blocks " + req.blocks + " from " + req.address.host )
-    }
-
-    private var copiers: List[_ <: Thread] = null
-
-    override def initialize() {
-      // Split Local Remote Blocks and set numBlocksToFetch
-      val remoteRequests = splitLocalRemoteBlocks()
-      // Add the remote requests into our queue in a random order
-      for (request <- Utils.randomize(remoteRequests)) {
-        fetchRequestsSync.put(request)
-      }
-
-      copiers = startCopiers(System.getProperty("spark.shuffle.copier.threads", "6").toInt)
-      logInfo("Started " + fetchRequestsSync.size + " remote gets in " +
-        Utils.getUsedTimeMs(startTime))
-
-      // Get Local Blocks
-      startTime = System.currentTimeMillis
-      getLocalBlocks()
-      logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
-    }
-
-    override def next(): (String, Option[Iterator[Any]]) = {
-      resultsGotten += 1
-      val result = results.take()
-      // If all the results has been retrieved, copiers will exit automatically
-      (result.blockId, if (result.failed) None else Some(result.deserialize()))
-    }
-  }
-  // End of NettyBlockFetcherIterator
-}