You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:00 UTC
[16/69] [abbrv] [partial] Initial work to rename package to
org.apache.spark
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
deleted file mode 100644
index 0248830..0000000
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler.cluster
-
-import java.nio.ByteBuffer
-
-import spark.TaskState.TaskState
-import spark.scheduler.TaskSet
-
-/**
- * Tracks and schedules the tasks within a single TaskSet. This class keeps track of the status of
- * each task and is responsible for retries on failure and locality. The main interfaces to it
- * are resourceOffer, which asks the TaskSet whether it wants to run a task on one node, and
- * statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
- *
- * THREADING: This class is designed to only be called from code with a lock on the TaskScheduler
- * (e.g. its event handlers). It should not be called from other threads.
- */
-private[spark] trait TaskSetManager extends Schedulable {
- def schedulableQueue = null
-
- def schedulingMode = SchedulingMode.NONE
-
- def taskSet: TaskSet
-
- def resourceOffer(
- execId: String,
- host: String,
- availableCpus: Int,
- maxLocality: TaskLocality.TaskLocality)
- : Option[TaskDescription]
-
- def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)
-
- def error(message: String)
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
deleted file mode 100644
index 1d09bd9..0000000
--- a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler.cluster
-
-/**
- * Represents free resources available on an executor.
- */
-private[spark]
-class WorkerOffer(val executorId: String, val host: String, val cores: Int)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
deleted file mode 100644
index 5be4dbd..0000000
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler.local
-
-import java.io.File
-import java.lang.management.ManagementFactory
-import java.util.concurrent.atomic.AtomicInteger
-import java.nio.ByteBuffer
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-
-import spark._
-import spark.TaskState.TaskState
-import spark.executor.ExecutorURLClassLoader
-import spark.scheduler._
-import spark.scheduler.cluster._
-import spark.scheduler.cluster.SchedulingMode.SchedulingMode
-import akka.actor._
-
-/**
- * A FIFO or Fair TaskScheduler implementation that runs tasks locally in a thread pool. Optionally
- * the scheduler also allows each task to fail up to maxFailures times, which is useful for
- * testing fault recovery.
- */
-
-private[spark]
-case class LocalReviveOffers()
-
-private[spark]
-case class LocalStatusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer)
-
-private[spark]
-class LocalActor(localScheduler: LocalScheduler, var freeCores: Int) extends Actor with Logging {
-
- def receive = {
- case LocalReviveOffers =>
- launchTask(localScheduler.resourceOffer(freeCores))
- case LocalStatusUpdate(taskId, state, serializeData) =>
- freeCores += 1
- localScheduler.statusUpdate(taskId, state, serializeData)
- launchTask(localScheduler.resourceOffer(freeCores))
- }
-
- def launchTask(tasks : Seq[TaskDescription]) {
- for (task <- tasks) {
- freeCores -= 1
- localScheduler.threadPool.submit(new Runnable {
- def run() {
- localScheduler.runTask(task.taskId, task.serializedTask)
- }
- })
- }
- }
-}
-
-private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: SparkContext)
- extends TaskScheduler
- with Logging {
-
- var attemptId = new AtomicInteger(0)
- var threadPool = Utils.newDaemonFixedThreadPool(threads)
- val env = SparkEnv.get
- var listener: TaskSchedulerListener = null
-
- // Application dependencies (added through SparkContext) that we've fetched so far on this node.
- // Each map holds the master's timestamp for the version of that file or JAR we got.
- val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
- val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
-
- val classLoader = new ExecutorURLClassLoader(Array(), Thread.currentThread.getContextClassLoader)
-
- var schedulableBuilder: SchedulableBuilder = null
- var rootPool: Pool = null
- val schedulingMode: SchedulingMode = SchedulingMode.withName(
- System.getProperty("spark.cluster.schedulingmode", "FIFO"))
- val activeTaskSets = new HashMap[String, TaskSetManager]
- val taskIdToTaskSetId = new HashMap[Long, String]
- val taskSetTaskIds = new HashMap[String, HashSet[Long]]
-
- var localActor: ActorRef = null
-
- override def start() {
- // temporarily set rootPool name to empty
- rootPool = new Pool("", schedulingMode, 0, 0)
- schedulableBuilder = {
- schedulingMode match {
- case SchedulingMode.FIFO =>
- new FIFOSchedulableBuilder(rootPool)
- case SchedulingMode.FAIR =>
- new FairSchedulableBuilder(rootPool)
- }
- }
- schedulableBuilder.buildPools()
-
- localActor = env.actorSystem.actorOf(Props(new LocalActor(this, threads)), "Test")
- }
-
- override def setListener(listener: TaskSchedulerListener) {
- this.listener = listener
- }
-
- override def submitTasks(taskSet: TaskSet) {
- synchronized {
- val manager = new LocalTaskSetManager(this, taskSet)
- schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
- activeTaskSets(taskSet.id) = manager
- taskSetTaskIds(taskSet.id) = new HashSet[Long]()
- localActor ! LocalReviveOffers
- }
- }
-
- def resourceOffer(freeCores: Int): Seq[TaskDescription] = {
- synchronized {
- var freeCpuCores = freeCores
- val tasks = new ArrayBuffer[TaskDescription](freeCores)
- val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
- for (manager <- sortedTaskSetQueue) {
- logDebug("parentName:%s,name:%s,runningTasks:%s".format(
- manager.parent.name, manager.name, manager.runningTasks))
- }
-
- var launchTask = false
- for (manager <- sortedTaskSetQueue) {
- do {
- launchTask = false
- manager.resourceOffer(null, null, freeCpuCores, null) match {
- case Some(task) =>
- tasks += task
- taskIdToTaskSetId(task.taskId) = manager.taskSet.id
- taskSetTaskIds(manager.taskSet.id) += task.taskId
- freeCpuCores -= 1
- launchTask = true
- case None => {}
- }
- } while(launchTask)
- }
- return tasks
- }
- }
-
- def taskSetFinished(manager: TaskSetManager) {
- synchronized {
- activeTaskSets -= manager.taskSet.id
- manager.parent.removeSchedulable(manager)
- logInfo("Remove TaskSet %s from pool %s".format(manager.taskSet.id, manager.parent.name))
- taskIdToTaskSetId --= taskSetTaskIds(manager.taskSet.id)
- taskSetTaskIds -= manager.taskSet.id
- }
- }
-
- def runTask(taskId: Long, bytes: ByteBuffer) {
- logInfo("Running " + taskId)
- val info = new TaskInfo(taskId, 0, System.currentTimeMillis(), "local", "local:1", TaskLocality.NODE_LOCAL)
- // Set the Spark execution environment for the worker thread
- SparkEnv.set(env)
- val ser = SparkEnv.get.closureSerializer.newInstance()
- val objectSer = SparkEnv.get.serializer.newInstance()
- var attemptedTask: Option[Task[_]] = None
- val start = System.currentTimeMillis()
- var taskStart: Long = 0
- def getTotalGCTime = ManagementFactory.getGarbageCollectorMXBeans.map(g => g.getCollectionTime).sum
- val startGCTime = getTotalGCTime
-
- try {
- Accumulators.clear()
- Thread.currentThread().setContextClassLoader(classLoader)
-
- // Serialize and deserialize the task so that accumulators are changed to thread-local ones;
- // this adds a bit of unnecessary overhead but matches how the Mesos Executor works.
- val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(bytes)
- updateDependencies(taskFiles, taskJars) // Download any files added with addFile
- val deserializedTask = ser.deserialize[Task[_]](
- taskBytes, Thread.currentThread.getContextClassLoader)
- attemptedTask = Some(deserializedTask)
- val deserTime = System.currentTimeMillis() - start
- taskStart = System.currentTimeMillis()
-
- // Run it
- val result: Any = deserializedTask.run(taskId)
-
- // Serialize and deserialize the result to emulate what the Mesos
- // executor does. This is useful to catch serialization errors early
- // on in development (so when users move their local Spark programs
- // to the cluster, they don't get surprised by serialization errors).
- val serResult = objectSer.serialize(result)
- deserializedTask.metrics.get.resultSize = serResult.limit()
- val resultToReturn = objectSer.deserialize[Any](serResult)
- val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]](
- ser.serialize(Accumulators.values))
- val serviceTime = System.currentTimeMillis() - taskStart
- logInfo("Finished " + taskId)
- deserializedTask.metrics.get.executorRunTime = serviceTime.toInt
- deserializedTask.metrics.get.jvmGCTime = getTotalGCTime - startGCTime
- deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt
- val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null))
- val serializedResult = ser.serialize(taskResult)
- localActor ! LocalStatusUpdate(taskId, TaskState.FINISHED, serializedResult)
- } catch {
- case t: Throwable => {
- val serviceTime = System.currentTimeMillis() - taskStart
- val metrics = attemptedTask.flatMap(t => t.metrics)
- for (m <- metrics) {
- m.executorRunTime = serviceTime.toInt
- m.jvmGCTime = getTotalGCTime - startGCTime
- }
- val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics)
- localActor ! LocalStatusUpdate(taskId, TaskState.FAILED, ser.serialize(failure))
- }
- }
- }
-
- /**
- * Download any missing dependencies if we receive a new set of files and JARs from the
- * SparkContext. Also adds any new JARs we fetched to the class loader.
- */
- private def updateDependencies(newFiles: HashMap[String, Long], newJars: HashMap[String, Long]) {
- synchronized {
- // Fetch missing dependencies
- for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) < timestamp) {
- logInfo("Fetching " + name + " with timestamp " + timestamp)
- Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
- currentFiles(name) = timestamp
- }
-
- for ((name, timestamp) <- newJars if currentJars.getOrElse(name, -1L) < timestamp) {
- logInfo("Fetching " + name + " with timestamp " + timestamp)
- Utils.fetchFile(name, new File(SparkFiles.getRootDirectory))
- currentJars(name) = timestamp
- // Add it to our class loader
- val localName = name.split("/").last
- val url = new File(SparkFiles.getRootDirectory, localName).toURI.toURL
- if (!classLoader.getURLs.contains(url)) {
- logInfo("Adding " + url + " to class loader")
- classLoader.addURL(url)
- }
- }
- }
- }
-
- def statusUpdate(taskId :Long, state: TaskState, serializedData: ByteBuffer) {
- synchronized {
- val taskSetId = taskIdToTaskSetId(taskId)
- val taskSetManager = activeTaskSets(taskSetId)
- taskSetTaskIds(taskSetId) -= taskId
- taskSetManager.statusUpdate(taskId, state, serializedData)
- }
- }
-
- override def stop() {
- threadPool.shutdownNow()
- }
-
- override def defaultParallelism() = threads
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
deleted file mode 100644
index e237f28..0000000
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler.local
-
-import java.nio.ByteBuffer
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-
-import spark.{ExceptionFailure, Logging, SparkEnv, Success, TaskState}
-import spark.TaskState.TaskState
-import spark.scheduler.{Task, TaskResult, TaskSet}
-import spark.scheduler.cluster.{Schedulable, TaskDescription, TaskInfo, TaskLocality, TaskSetManager}
-
-
-private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: TaskSet)
- extends TaskSetManager with Logging {
-
- var parent: Schedulable = null
- var weight: Int = 1
- var minShare: Int = 0
- var runningTasks: Int = 0
- var priority: Int = taskSet.priority
- var stageId: Int = taskSet.stageId
- var name: String = "TaskSet_" + taskSet.stageId.toString
-
- var failCount = new Array[Int](taskSet.tasks.size)
- val taskInfos = new HashMap[Long, TaskInfo]
- val numTasks = taskSet.tasks.size
- var numFinished = 0
- val env = SparkEnv.get
- val ser = env.closureSerializer.newInstance()
- val copiesRunning = new Array[Int](numTasks)
- val finished = new Array[Boolean](numTasks)
- val numFailures = new Array[Int](numTasks)
- val MAX_TASK_FAILURES = sched.maxFailures
-
- override def increaseRunningTasks(taskNum: Int): Unit = {
- runningTasks += taskNum
- if (parent != null) {
- parent.increaseRunningTasks(taskNum)
- }
- }
-
- override def decreaseRunningTasks(taskNum: Int): Unit = {
- runningTasks -= taskNum
- if (parent != null) {
- parent.decreaseRunningTasks(taskNum)
- }
- }
-
- override def addSchedulable(schedulable: Schedulable): Unit = {
- // nothing
- }
-
- override def removeSchedulable(schedulable: Schedulable): Unit = {
- // nothing
- }
-
- override def getSchedulableByName(name: String): Schedulable = {
- return null
- }
-
- override def executorLost(executorId: String, host: String): Unit = {
- // nothing
- }
-
- override def checkSpeculatableTasks() = true
-
- override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
- var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
- sortedTaskSetQueue += this
- return sortedTaskSetQueue
- }
-
- override def hasPendingTasks() = true
-
- def findTask(): Option[Int] = {
- for (i <- 0 to numTasks-1) {
- if (copiesRunning(i) == 0 && !finished(i)) {
- return Some(i)
- }
- }
- return None
- }
-
- override def resourceOffer(
- execId: String,
- host: String,
- availableCpus: Int,
- maxLocality: TaskLocality.TaskLocality)
- : Option[TaskDescription] =
- {
- SparkEnv.set(sched.env)
- logDebug("availableCpus:%d, numFinished:%d, numTasks:%d".format(
- availableCpus.toInt, numFinished, numTasks))
- if (availableCpus > 0 && numFinished < numTasks) {
- findTask() match {
- case Some(index) =>
- val taskId = sched.attemptId.getAndIncrement()
- val task = taskSet.tasks(index)
- val info = new TaskInfo(taskId, index, System.currentTimeMillis(), "local", "local:1",
- TaskLocality.NODE_LOCAL)
- taskInfos(taskId) = info
- // We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
- // we assume the task can be serialized without exceptions.
- val bytes = Task.serializeWithDependencies(
- task, sched.sc.addedFiles, sched.sc.addedJars, ser)
- logInfo("Size of task " + taskId + " is " + bytes.limit + " bytes")
- val taskName = "task %s:%d".format(taskSet.id, index)
- copiesRunning(index) += 1
- increaseRunningTasks(1)
- taskStarted(task, info)
- return Some(new TaskDescription(taskId, null, taskName, index, bytes))
- case None => {}
- }
- }
- return None
- }
-
- override def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
- SparkEnv.set(env)
- state match {
- case TaskState.FINISHED =>
- taskEnded(tid, state, serializedData)
- case TaskState.FAILED =>
- taskFailed(tid, state, serializedData)
- case _ => {}
- }
- }
-
- def taskStarted(task: Task[_], info: TaskInfo) {
- sched.listener.taskStarted(task, info)
- }
-
- def taskEnded(tid: Long, state: TaskState, serializedData: ByteBuffer) {
- val info = taskInfos(tid)
- val index = info.index
- val task = taskSet.tasks(index)
- info.markSuccessful()
- val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader)
- result.metrics.resultSize = serializedData.limit()
- sched.listener.taskEnded(task, Success, result.value, result.accumUpdates, info, result.metrics)
- numFinished += 1
- decreaseRunningTasks(1)
- finished(index) = true
- if (numFinished == numTasks) {
- sched.taskSetFinished(this)
- }
- }
-
- def taskFailed(tid: Long, state: TaskState, serializedData: ByteBuffer) {
- val info = taskInfos(tid)
- val index = info.index
- val task = taskSet.tasks(index)
- info.markFailed()
- decreaseRunningTasks(1)
- val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](
- serializedData, getClass.getClassLoader)
- sched.listener.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null))
- if (!finished(index)) {
- copiesRunning(index) -= 1
- numFailures(index) += 1
- val locs = reason.stackTrace.map(loc => "\tat %s".format(loc.toString))
- logInfo("Loss was due to %s\n%s\n%s".format(
- reason.className, reason.description, locs.mkString("\n")))
- if (numFailures(index) > MAX_TASK_FAILURES) {
- val errorMessage = "Task %s:%d failed more than %d times; aborting job %s".format(
- taskSet.id, index, 4, reason.description)
- decreaseRunningTasks(runningTasks)
- sched.listener.taskSetFailed(taskSet, errorMessage)
- // need to delete failed Taskset from schedule queue
- sched.taskSetFinished(this)
- }
- }
- }
-
- override def error(message: String) {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
deleted file mode 100644
index eef3ee1..0000000
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler.mesos
-
-import com.google.protobuf.ByteString
-
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-
-import spark.{SparkException, Utils, Logging, SparkContext}
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.collection.JavaConversions._
-import java.io.File
-import spark.scheduler.cluster._
-import java.util.{ArrayList => JArrayList, List => JList}
-import java.util.Collections
-import spark.TaskState
-
-/**
- * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
- * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever
- * a task is done. It launches Spark tasks within the coarse-grained Mesos tasks using the
- * StandaloneBackend mechanism. This class is useful for lower and more predictable latency.
- *
- * Unfortunately this has a bit of duplication from MesosSchedulerBackend, but it seems hard to
- * remove this.
- */
-private[spark] class CoarseMesosSchedulerBackend(
- scheduler: ClusterScheduler,
- sc: SparkContext,
- master: String,
- appName: String)
- extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
- with MScheduler
- with Logging {
-
- val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures
-
- // Lock used to wait for scheduler to be registered
- var isRegistered = false
- val registeredLock = new Object()
-
- // Driver for talking to Mesos
- var driver: SchedulerDriver = null
-
- // Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
- val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
-
- // Cores we have acquired with each Mesos task ID
- val coresByTaskId = new HashMap[Int, Int]
- var totalCoresAcquired = 0
-
- val slaveIdsWithExecutors = new HashSet[String]
-
- val taskIdToSlaveId = new HashMap[Int, String]
- val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed
-
- val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
- "Spark home is not set; set it through the spark.home system " +
- "property, the SPARK_HOME environment variable or the SparkContext constructor"))
-
- val extraCoresPerSlave = System.getProperty("spark.mesos.extra.cores", "0").toInt
-
- var nextMesosTaskId = 0
-
- def newMesosTaskId(): Int = {
- val id = nextMesosTaskId
- nextMesosTaskId += 1
- id
- }
-
- override def start() {
- super.start()
-
- synchronized {
- new Thread("CoarseMesosSchedulerBackend driver") {
- setDaemon(true)
- override def run() {
- val scheduler = CoarseMesosSchedulerBackend.this
- val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
- driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
- try { {
- val ret = driver.run()
- logInfo("driver.run() returned with code " + ret)
- }
- } catch {
- case e: Exception => logError("driver.run() failed", e)
- }
- }
- }.start()
-
- waitForRegister()
- }
- }
-
- def createCommand(offer: Offer, numCores: Int): CommandInfo = {
- val environment = Environment.newBuilder()
- sc.executorEnvs.foreach { case (key, value) =>
- environment.addVariables(Environment.Variable.newBuilder()
- .setName(key)
- .setValue(value)
- .build())
- }
- val command = CommandInfo.newBuilder()
- .setEnvironment(environment)
- val driverUrl = "akka://spark@%s:%s/user/%s".format(
- System.getProperty("spark.driver.host"),
- System.getProperty("spark.driver.port"),
- StandaloneSchedulerBackend.ACTOR_NAME)
- val uri = System.getProperty("spark.executor.uri")
- if (uri == null) {
- val runScript = new File(sparkHome, "spark-class").getCanonicalPath
- command.setValue("\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
- runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
- } else {
- // Grab everything to the first '.'. We'll use that and '*' to
- // glob the directory "correctly".
- val basename = uri.split('/').last.split('.').head
- command.setValue("cd %s*; ./spark-class spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
- basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
- command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
- }
- return command.build()
- }
-
- override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
-
- override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
- logInfo("Registered as framework ID " + frameworkId.getValue)
- registeredLock.synchronized {
- isRegistered = true
- registeredLock.notifyAll()
- }
- }
-
- def waitForRegister() {
- registeredLock.synchronized {
- while (!isRegistered) {
- registeredLock.wait()
- }
- }
- }
-
- override def disconnected(d: SchedulerDriver) {}
-
- override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
-
- /**
- * Method called by Mesos to offer resources on slaves. We respond by launching an executor,
- * unless we've already launched more than we wanted to.
- */
- override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
- synchronized {
- val filters = Filters.newBuilder().setRefuseSeconds(-1).build()
-
- for (offer <- offers) {
- val slaveId = offer.getSlaveId.toString
- val mem = getResource(offer.getResourcesList, "mem")
- val cpus = getResource(offer.getResourcesList, "cpus").toInt
- if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 &&
- failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
- !slaveIdsWithExecutors.contains(slaveId)) {
- // Launch an executor on the slave
- val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
- val taskId = newMesosTaskId()
- taskIdToSlaveId(taskId) = slaveId
- slaveIdsWithExecutors += slaveId
- coresByTaskId(taskId) = cpusToUse
- val task = MesosTaskInfo.newBuilder()
- .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
- .setSlaveId(offer.getSlaveId)
- .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
- .setName("Task " + taskId)
- .addResources(createResource("cpus", cpusToUse))
- .addResources(createResource("mem", executorMemory))
- .build()
- d.launchTasks(offer.getId, Collections.singletonList(task), filters)
- } else {
- // Filter it out
- d.launchTasks(offer.getId, Collections.emptyList[MesosTaskInfo](), filters)
- }
- }
- }
- }
-
- /** Helper function to pull out a resource from a Mesos Resources protobuf */
- private def getResource(res: JList[Resource], name: String): Double = {
- for (r <- res if r.getName == name) {
- return r.getScalar.getValue
- }
- // If we reached here, no resource with the required name was present
- throw new IllegalArgumentException("No resource called " + name + " in " + res)
- }
-
- /** Build a Mesos resource protobuf object */
- private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
- Resource.newBuilder()
- .setName(resourceName)
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
- .build()
- }
-
- /** Check whether a Mesos task state represents a finished task */
- private def isFinished(state: MesosTaskState) = {
- state == MesosTaskState.TASK_FINISHED ||
- state == MesosTaskState.TASK_FAILED ||
- state == MesosTaskState.TASK_KILLED ||
- state == MesosTaskState.TASK_LOST
- }
-
- override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
- val taskId = status.getTaskId.getValue.toInt
- val state = status.getState
- logInfo("Mesos task " + taskId + " is now " + state)
- synchronized {
- if (isFinished(state)) {
- val slaveId = taskIdToSlaveId(taskId)
- slaveIdsWithExecutors -= slaveId
- taskIdToSlaveId -= taskId
- // Remove the cores we have remembered for this task, if it's in the hashmap
- for (cores <- coresByTaskId.get(taskId)) {
- totalCoresAcquired -= cores
- coresByTaskId -= taskId
- }
- // If it was a failure, mark the slave as failed for blacklisting purposes
- if (state == MesosTaskState.TASK_FAILED || state == MesosTaskState.TASK_LOST) {
- failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
- if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
- logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
- "is Spark installed on it?")
- }
- }
- driver.reviveOffers() // In case we'd rejected everything before but have now lost a node
- }
- }
- }
-
- override def error(d: SchedulerDriver, message: String) {
- logError("Mesos error: " + message)
- scheduler.error(message)
- }
-
- override def stop() {
- super.stop()
- if (driver != null) {
- driver.stop()
- }
- }
-
- override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
-
- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
- logInfo("Mesos slave lost: " + slaveId.getValue)
- synchronized {
- if (slaveIdsWithExecutors.contains(slaveId.getValue)) {
- // Note that the slave ID corresponds to the executor ID on that slave
- slaveIdsWithExecutors -= slaveId.getValue
- removeExecutor(slaveId.getValue, "Mesos slave lost")
- }
- }
- }
-
- override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) {
- logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
- slaveLost(d, s)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
deleted file mode 100644
index f6069a5..0000000
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler.mesos
-
-import com.google.protobuf.ByteString
-
-import org.apache.mesos.{Scheduler => MScheduler}
-import org.apache.mesos._
-import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
-
-import spark.{SparkException, Utils, Logging, SparkContext}
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.collection.JavaConversions._
-import java.io.File
-import spark.scheduler.cluster._
-import java.util.{ArrayList => JArrayList, List => JList}
-import java.util.Collections
-import spark.TaskState
-
-/**
- * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
- * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
- * from multiple apps can run on different cores) and in time (a core can switch ownership).
- */
-private[spark] class MesosSchedulerBackend(
- scheduler: ClusterScheduler,
- sc: SparkContext,
- master: String,
- appName: String)
- extends SchedulerBackend
- with MScheduler
- with Logging {
-
- // Lock used to wait for scheduler to be registered
- var isRegistered = false
- val registeredLock = new Object()
-
- // Driver for talking to Mesos
- var driver: SchedulerDriver = null
-
- // Which slave IDs we have executors on
- val slaveIdsWithExecutors = new HashSet[String]
- val taskIdToSlaveId = new HashMap[Long, String]
-
- // An ExecutorInfo for our tasks
- var execArgs: Array[Byte] = null
-
- var classLoader: ClassLoader = null
-
- override def start() {
- synchronized {
- classLoader = Thread.currentThread.getContextClassLoader
-
- new Thread("MesosSchedulerBackend driver") {
- setDaemon(true)
- override def run() {
- val scheduler = MesosSchedulerBackend.this
- val fwInfo = FrameworkInfo.newBuilder().setUser("").setName(appName).build()
- driver = new MesosSchedulerDriver(scheduler, fwInfo, master)
- try {
- val ret = driver.run()
- logInfo("driver.run() returned with code " + ret)
- } catch {
- case e: Exception => logError("driver.run() failed", e)
- }
- }
- }.start()
-
- waitForRegister()
- }
- }
-
- def createExecutorInfo(execId: String): ExecutorInfo = {
- val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
- "Spark home is not set; set it through the spark.home system " +
- "property, the SPARK_HOME environment variable or the SparkContext constructor"))
- val environment = Environment.newBuilder()
- sc.executorEnvs.foreach { case (key, value) =>
- environment.addVariables(Environment.Variable.newBuilder()
- .setName(key)
- .setValue(value)
- .build())
- }
- val command = CommandInfo.newBuilder()
- .setEnvironment(environment)
- val uri = System.getProperty("spark.executor.uri")
- if (uri == null) {
- command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath)
- } else {
- // Grab everything to the first '.'. We'll use that and '*' to
- // glob the directory "correctly".
- val basename = uri.split('/').last.split('.').head
- command.setValue("cd %s*; ./spark-executor".format(basename))
- command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
- }
- val memory = Resource.newBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(executorMemory).build())
- .build()
- ExecutorInfo.newBuilder()
- .setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
- .setCommand(command)
- .setData(ByteString.copyFrom(createExecArg()))
- .addResources(memory)
- .build()
- }
-
- /**
- * Create and serialize the executor argument to pass to Mesos. Our executor arg is an array
- * containing all the spark.* system properties in the form of (String, String) pairs.
- */
- private def createExecArg(): Array[Byte] = {
- if (execArgs == null) {
- val props = new HashMap[String, String]
- val iterator = System.getProperties.entrySet.iterator
- while (iterator.hasNext) {
- val entry = iterator.next
- val (key, value) = (entry.getKey.toString, entry.getValue.toString)
- if (key.startsWith("spark.")) {
- props(key) = value
- }
- }
- // Serialize the map as an array of (String, String) pairs
- execArgs = Utils.serialize(props.toArray)
- }
- return execArgs
- }
-
- private def setClassLoader(): ClassLoader = {
- val oldClassLoader = Thread.currentThread.getContextClassLoader
- Thread.currentThread.setContextClassLoader(classLoader)
- return oldClassLoader
- }
-
- private def restoreClassLoader(oldClassLoader: ClassLoader) {
- Thread.currentThread.setContextClassLoader(oldClassLoader)
- }
-
- override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
-
- override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
- val oldClassLoader = setClassLoader()
- try {
- logInfo("Registered as framework ID " + frameworkId.getValue)
- registeredLock.synchronized {
- isRegistered = true
- registeredLock.notifyAll()
- }
- } finally {
- restoreClassLoader(oldClassLoader)
- }
- }
-
- def waitForRegister() {
- registeredLock.synchronized {
- while (!isRegistered) {
- registeredLock.wait()
- }
- }
- }
-
- override def disconnected(d: SchedulerDriver) {}
-
- override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}
-
- /**
- * Method called by Mesos to offer resources on slaves. We resond by asking our active task sets
- * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
- * tasks are balanced across the cluster.
- */
- override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
- val oldClassLoader = setClassLoader()
- try {
- synchronized {
- // Build a big list of the offerable workers, and remember their indices so that we can
- // figure out which Offer to reply to for each worker
- val offerableIndices = new ArrayBuffer[Int]
- val offerableWorkers = new ArrayBuffer[WorkerOffer]
-
- def enoughMemory(o: Offer) = {
- val mem = getResource(o.getResourcesList, "mem")
- val slaveId = o.getSlaveId.getValue
- mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
- }
-
- for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
- offerableIndices += index
- offerableWorkers += new WorkerOffer(
- offer.getSlaveId.getValue,
- offer.getHostname,
- getResource(offer.getResourcesList, "cpus").toInt)
- }
-
- // Call into the ClusterScheduler
- val taskLists = scheduler.resourceOffers(offerableWorkers)
-
- // Build a list of Mesos tasks for each slave
- val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
- for ((taskList, index) <- taskLists.zipWithIndex) {
- if (!taskList.isEmpty) {
- val offerNum = offerableIndices(index)
- val slaveId = offers(offerNum).getSlaveId.getValue
- slaveIdsWithExecutors += slaveId
- mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
- for (taskDesc <- taskList) {
- taskIdToSlaveId(taskDesc.taskId) = slaveId
- mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
- }
- }
- }
-
- // Reply to the offers
- val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
- for (i <- 0 until offers.size) {
- d.launchTasks(offers(i).getId, mesosTasks(i), filters)
- }
- }
- } finally {
- restoreClassLoader(oldClassLoader)
- }
- }
-
- /** Helper function to pull out a resource from a Mesos Resources protobuf */
- def getResource(res: JList[Resource], name: String): Double = {
- for (r <- res if r.getName == name) {
- return r.getScalar.getValue
- }
- // If we reached here, no resource with the required name was present
- throw new IllegalArgumentException("No resource called " + name + " in " + res)
- }
-
- /** Turn a Spark TaskDescription into a Mesos task */
- def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
- val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
- val cpuResource = Resource.newBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(1).build())
- .build()
- return MesosTaskInfo.newBuilder()
- .setTaskId(taskId)
- .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
- .setExecutor(createExecutorInfo(slaveId))
- .setName(task.name)
- .addResources(cpuResource)
- .setData(ByteString.copyFrom(task.serializedTask))
- .build()
- }
-
- /** Check whether a Mesos task state represents a finished task */
- def isFinished(state: MesosTaskState) = {
- state == MesosTaskState.TASK_FINISHED ||
- state == MesosTaskState.TASK_FAILED ||
- state == MesosTaskState.TASK_KILLED ||
- state == MesosTaskState.TASK_LOST
- }
-
- override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
- val oldClassLoader = setClassLoader()
- try {
- val tid = status.getTaskId.getValue.toLong
- val state = TaskState.fromMesos(status.getState)
- synchronized {
- if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
- // We lost the executor on this slave, so remember that it's gone
- slaveIdsWithExecutors -= taskIdToSlaveId(tid)
- }
- if (isFinished(status.getState)) {
- taskIdToSlaveId.remove(tid)
- }
- }
- scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
- } finally {
- restoreClassLoader(oldClassLoader)
- }
- }
-
- override def error(d: SchedulerDriver, message: String) {
- val oldClassLoader = setClassLoader()
- try {
- logError("Mesos error: " + message)
- scheduler.error(message)
- } finally {
- restoreClassLoader(oldClassLoader)
- }
- }
-
- override def stop() {
- if (driver != null) {
- driver.stop()
- }
- }
-
- override def reviveOffers() {
- driver.reviveOffers()
- }
-
- override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
-
- private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
- val oldClassLoader = setClassLoader()
- try {
- logInfo("Mesos slave lost: " + slaveId.getValue)
- synchronized {
- slaveIdsWithExecutors -= slaveId.getValue
- }
- scheduler.executorLost(slaveId.getValue, reason)
- } finally {
- restoreClassLoader(oldClassLoader)
- }
- }
-
- override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {
- recordSlaveLost(d, slaveId, SlaveLost())
- }
-
- override def executorLost(d: SchedulerDriver, executorId: ExecutorID,
- slaveId: SlaveID, status: Int) {
- logInfo("Executor lost: %s, marking slave %s as lost".format(executorId.getValue,
- slaveId.getValue))
- recordSlaveLost(d, slaveId, ExecutorExited(status))
- }
-
- // TODO: query Mesos for number of cores
- override def defaultParallelism() = System.getProperty("spark.default.parallelism", "8").toInt
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/serializer/Serializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/serializer/Serializer.scala b/core/src/main/scala/spark/serializer/Serializer.scala
deleted file mode 100644
index dc94d42..0000000
--- a/core/src/main/scala/spark/serializer/Serializer.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.serializer
-
-import java.io.{EOFException, InputStream, OutputStream}
-import java.nio.ByteBuffer
-
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-
-import spark.util.ByteBufferInputStream
-
-
-/**
- * A serializer. Because some serialization libraries are not thread safe, this class is used to
- * create [[spark.serializer.SerializerInstance]] objects that do the actual serialization and are
- * guaranteed to only be called from one thread at a time.
- */
-trait Serializer {
- def newInstance(): SerializerInstance
-}
-
-
-/**
- * An instance of a serializer, for use by one thread at a time.
- */
-trait SerializerInstance {
- def serialize[T](t: T): ByteBuffer
-
- def deserialize[T](bytes: ByteBuffer): T
-
- def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T
-
- def serializeStream(s: OutputStream): SerializationStream
-
- def deserializeStream(s: InputStream): DeserializationStream
-
- def serializeMany[T](iterator: Iterator[T]): ByteBuffer = {
- // Default implementation uses serializeStream
- val stream = new FastByteArrayOutputStream()
- serializeStream(stream).writeAll(iterator)
- val buffer = ByteBuffer.allocate(stream.position.toInt)
- buffer.put(stream.array, 0, stream.position.toInt)
- buffer.flip()
- buffer
- }
-
- def deserializeMany(buffer: ByteBuffer): Iterator[Any] = {
- // Default implementation uses deserializeStream
- buffer.rewind()
- deserializeStream(new ByteBufferInputStream(buffer)).asIterator
- }
-}
-
-
-/**
- * A stream for writing serialized objects.
- */
-trait SerializationStream {
- def writeObject[T](t: T): SerializationStream
- def flush(): Unit
- def close(): Unit
-
- def writeAll[T](iter: Iterator[T]): SerializationStream = {
- while (iter.hasNext) {
- writeObject(iter.next())
- }
- this
- }
-}
-
-
-/**
- * A stream for reading serialized objects.
- */
-trait DeserializationStream {
- def readObject[T](): T
- def close(): Unit
-
- /**
- * Read the elements of this stream through an iterator. This can only be called once, as
- * reading each element will consume data from the input source.
- */
- def asIterator: Iterator[Any] = new spark.util.NextIterator[Any] {
- override protected def getNext() = {
- try {
- readObject[Any]()
- } catch {
- case eof: EOFException =>
- finished = true
- }
- }
-
- override protected def close() {
- DeserializationStream.this.close()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/serializer/SerializerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/serializer/SerializerManager.scala b/core/src/main/scala/spark/serializer/SerializerManager.scala
deleted file mode 100644
index b7b2470..0000000
--- a/core/src/main/scala/spark/serializer/SerializerManager.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.serializer
-
-import java.util.concurrent.ConcurrentHashMap
-
-
-/**
- * A service that returns a serializer object given the serializer's class name. If a previous
- * instance of the serializer object has been created, the get method returns that instead of
- * creating a new one.
- */
-private[spark] class SerializerManager {
-
- private val serializers = new ConcurrentHashMap[String, Serializer]
- private var _default: Serializer = _
-
- def default = _default
-
- def setDefault(clsName: String): Serializer = {
- _default = get(clsName)
- _default
- }
-
- def get(clsName: String): Serializer = {
- if (clsName == null) {
- default
- } else {
- var serializer = serializers.get(clsName)
- if (serializer != null) {
- // If the serializer has been created previously, reuse that.
- serializer
- } else this.synchronized {
- // Otherwise, create a new one. But make sure no other thread has attempted
- // to create another new one at the same time.
- serializer = serializers.get(clsName)
- if (serializer == null) {
- val clsLoader = Thread.currentThread.getContextClassLoader
- serializer =
- Class.forName(clsName, true, clsLoader).newInstance().asInstanceOf[Serializer]
- serializers.put(clsName, serializer)
- }
- serializer
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockException.scala b/core/src/main/scala/spark/storage/BlockException.scala
deleted file mode 100644
index 8ebfaf3..0000000
--- a/core/src/main/scala/spark/storage/BlockException.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-private[spark]
-case class BlockException(blockId: String, message: String) extends Exception(message)
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockFetchTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockFetchTracker.scala b/core/src/main/scala/spark/storage/BlockFetchTracker.scala
deleted file mode 100644
index 265e554..0000000
--- a/core/src/main/scala/spark/storage/BlockFetchTracker.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-private[spark] trait BlockFetchTracker {
- def totalBlocks : Int
- def numLocalBlocks: Int
- def numRemoteBlocks: Int
- def remoteFetchTime : Long
- def fetchWaitTime: Long
- def remoteBytesRead : Long
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
deleted file mode 100644
index 568783d..0000000
--- a/core/src/main/scala/spark/storage/BlockFetcherIterator.scala
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.storage
-
-import java.nio.ByteBuffer
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashSet
-import scala.collection.mutable.Queue
-
-import io.netty.buffer.ByteBuf
-
-import spark.Logging
-import spark.Utils
-import spark.SparkException
-import spark.network.BufferMessage
-import spark.network.ConnectionManagerId
-import spark.network.netty.ShuffleCopier
-import spark.serializer.Serializer
-
-
-/**
- * A block fetcher iterator interface. There are two implementations:
- *
- * BasicBlockFetcherIterator: uses a custom-built NIO communication layer.
- * NettyBlockFetcherIterator: uses Netty (OIO) as the communication layer.
- *
- * Eventually we would like the two to converge and use a single NIO-based communication layer,
- * but extensive tests show that under some circumstances (e.g. large shuffles with lots of cores),
- * NIO would perform poorly and thus the need for the Netty OIO one.
- */
-
-private[storage]
-trait BlockFetcherIterator extends Iterator[(String, Option[Iterator[Any]])]
- with Logging with BlockFetchTracker {
- def initialize()
-}
-
-
-private[storage]
-object BlockFetcherIterator {
-
- // A request to fetch one or more blocks, complete with their sizes
- class FetchRequest(val address: BlockManagerId, val blocks: Seq[(String, Long)]) {
- val size = blocks.map(_._2).sum
- }
-
- // A result of a fetch. Includes the block ID, size in bytes, and a function to deserialize
- // the block (since we want all deserializaton to happen in the calling thread); can also
- // represent a fetch failure if size == -1.
- class FetchResult(val blockId: String, val size: Long, val deserialize: () => Iterator[Any]) {
- def failed: Boolean = size == -1
- }
-
- class BasicBlockFetcherIterator(
- private val blockManager: BlockManager,
- val blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])],
- serializer: Serializer)
- extends BlockFetcherIterator {
-
- import blockManager._
-
- private var _remoteBytesRead = 0l
- private var _remoteFetchTime = 0l
- private var _fetchWaitTime = 0l
-
- if (blocksByAddress == null) {
- throw new IllegalArgumentException("BlocksByAddress is null")
- }
-
- // Total number blocks fetched (local + remote). Also number of FetchResults expected
- protected var _numBlocksToFetch = 0
-
- protected var startTime = System.currentTimeMillis
-
- // This represents the number of local blocks, also counting zero-sized blocks
- private var numLocal = 0
- // BlockIds for local blocks that need to be fetched. Excludes zero-sized blocks
- protected val localBlocksToFetch = new ArrayBuffer[String]()
-
- // This represents the number of remote blocks, also counting zero-sized blocks
- private var numRemote = 0
- // BlockIds for remote blocks that need to be fetched. Excludes zero-sized blocks
- protected val remoteBlocksToFetch = new HashSet[String]()
-
- // A queue to hold our results.
- protected val results = new LinkedBlockingQueue[FetchResult]
-
- // Queue of fetch requests to issue; we'll pull requests off this gradually to make sure that
- // the number of bytes in flight is limited to maxBytesInFlight
- private val fetchRequests = new Queue[FetchRequest]
-
- // Current bytes in flight from our requests
- private var bytesInFlight = 0L
-
- protected def sendRequest(req: FetchRequest) {
- logDebug("Sending request for %d blocks (%s) from %s".format(
- req.blocks.size, Utils.bytesToString(req.size), req.address.hostPort))
- val cmId = new ConnectionManagerId(req.address.host, req.address.port)
- val blockMessageArray = new BlockMessageArray(req.blocks.map {
- case (blockId, size) => BlockMessage.fromGetBlock(GetBlock(blockId))
- })
- bytesInFlight += req.size
- val sizeMap = req.blocks.toMap // so we can look up the size of each blockID
- val fetchStart = System.currentTimeMillis()
- val future = connectionManager.sendMessageReliably(cmId, blockMessageArray.toBufferMessage)
- future.onSuccess {
- case Some(message) => {
- val fetchDone = System.currentTimeMillis()
- _remoteFetchTime += fetchDone - fetchStart
- val bufferMessage = message.asInstanceOf[BufferMessage]
- val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
- for (blockMessage <- blockMessageArray) {
- if (blockMessage.getType != BlockMessage.TYPE_GOT_BLOCK) {
- throw new SparkException(
- "Unexpected message " + blockMessage.getType + " received from " + cmId)
- }
- val blockId = blockMessage.getId
- val networkSize = blockMessage.getData.limit()
- results.put(new FetchResult(blockId, sizeMap(blockId),
- () => dataDeserialize(blockId, blockMessage.getData, serializer)))
- _remoteBytesRead += networkSize
- logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
- }
- }
- case None => {
- logError("Could not get block(s) from " + cmId)
- for ((blockId, size) <- req.blocks) {
- results.put(new FetchResult(blockId, -1, null))
- }
- }
- }
- }
-
- protected def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
- // Split local and remote blocks. Remote blocks are further split into FetchRequests of size
- // at most maxBytesInFlight in order to limit the amount of data in flight.
- val remoteRequests = new ArrayBuffer[FetchRequest]
- for ((address, blockInfos) <- blocksByAddress) {
- if (address == blockManagerId) {
- numLocal = blockInfos.size
- // Filter out zero-sized blocks
- localBlocksToFetch ++= blockInfos.filter(_._2 != 0).map(_._1)
- _numBlocksToFetch += localBlocksToFetch.size
- } else {
- numRemote += blockInfos.size
- // Make our requests at least maxBytesInFlight / 5 in length; the reason to keep them
- // smaller than maxBytesInFlight is to allow multiple, parallel fetches from up to 5
- // nodes, rather than blocking on reading output from one node.
- val minRequestSize = math.max(maxBytesInFlight / 5, 1L)
- logInfo("maxBytesInFlight: " + maxBytesInFlight + ", minRequest: " + minRequestSize)
- val iterator = blockInfos.iterator
- var curRequestSize = 0L
- var curBlocks = new ArrayBuffer[(String, Long)]
- while (iterator.hasNext) {
- val (blockId, size) = iterator.next()
- // Skip empty blocks
- if (size > 0) {
- curBlocks += ((blockId, size))
- remoteBlocksToFetch += blockId
- _numBlocksToFetch += 1
- curRequestSize += size
- } else if (size < 0) {
- throw new BlockException(blockId, "Negative block size " + size)
- }
- if (curRequestSize >= minRequestSize) {
- // Add this FetchRequest
- remoteRequests += new FetchRequest(address, curBlocks)
- curRequestSize = 0
- curBlocks = new ArrayBuffer[(String, Long)]
- }
- }
- // Add in the final request
- if (!curBlocks.isEmpty) {
- remoteRequests += new FetchRequest(address, curBlocks)
- }
- }
- }
- logInfo("Getting " + _numBlocksToFetch + " non-zero-bytes blocks out of " +
- totalBlocks + " blocks")
- remoteRequests
- }
-
- protected def getLocalBlocks() {
- // Get the local blocks while remote blocks are being fetched. Note that it's okay to do
- // these all at once because they will just memory-map some files, so they won't consume
- // any memory that might exceed our maxBytesInFlight
- for (id <- localBlocksToFetch) {
- getLocalFromDisk(id, serializer) match {
- case Some(iter) => {
- // Pass 0 as size since it's not in flight
- results.put(new FetchResult(id, 0, () => iter))
- logDebug("Got local block " + id)
- }
- case None => {
- throw new BlockException(id, "Could not get block " + id + " from local machine")
- }
- }
- }
- }
-
- override def initialize() {
- // Split local and remote blocks.
- val remoteRequests = splitLocalRemoteBlocks()
- // Add the remote requests into our queue in a random order
- fetchRequests ++= Utils.randomize(remoteRequests)
-
- // Send out initial requests for blocks, up to our maxBytesInFlight
- while (!fetchRequests.isEmpty &&
- (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
- sendRequest(fetchRequests.dequeue())
- }
-
- val numGets = remoteRequests.size - fetchRequests.size
- logInfo("Started " + numGets + " remote gets in " + Utils.getUsedTimeMs(startTime))
-
- // Get Local Blocks
- startTime = System.currentTimeMillis
- getLocalBlocks()
- logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
- }
-
- //an iterator that will read fetched blocks off the queue as they arrive.
- @volatile protected var resultsGotten = 0
-
- override def hasNext: Boolean = resultsGotten < _numBlocksToFetch
-
- override def next(): (String, Option[Iterator[Any]]) = {
- resultsGotten += 1
- val startFetchWait = System.currentTimeMillis()
- val result = results.take()
- val stopFetchWait = System.currentTimeMillis()
- _fetchWaitTime += (stopFetchWait - startFetchWait)
- if (! result.failed) bytesInFlight -= result.size
- while (!fetchRequests.isEmpty &&
- (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
- sendRequest(fetchRequests.dequeue())
- }
- (result.blockId, if (result.failed) None else Some(result.deserialize()))
- }
-
- // Implementing BlockFetchTracker trait.
- override def totalBlocks: Int = numLocal + numRemote
- override def numLocalBlocks: Int = numLocal
- override def numRemoteBlocks: Int = numRemote
- override def remoteFetchTime: Long = _remoteFetchTime
- override def fetchWaitTime: Long = _fetchWaitTime
- override def remoteBytesRead: Long = _remoteBytesRead
- }
- // End of BasicBlockFetcherIterator
-
- class NettyBlockFetcherIterator(
- blockManager: BlockManager,
- blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])],
- serializer: Serializer)
- extends BasicBlockFetcherIterator(blockManager, blocksByAddress, serializer) {
-
- import blockManager._
-
- val fetchRequestsSync = new LinkedBlockingQueue[FetchRequest]
-
- private def startCopiers(numCopiers: Int): List[_ <: Thread] = {
- (for ( i <- Range(0,numCopiers) ) yield {
- val copier = new Thread {
- override def run(){
- try {
- while(!isInterrupted && !fetchRequestsSync.isEmpty) {
- sendRequest(fetchRequestsSync.take())
- }
- } catch {
- case x: InterruptedException => logInfo("Copier Interrupted")
- //case _ => throw new SparkException("Exception Throw in Shuffle Copier")
- }
- }
- }
- copier.start
- copier
- }).toList
- }
-
- // keep this to interrupt the threads when necessary
- private def stopCopiers() {
- for (copier <- copiers) {
- copier.interrupt()
- }
- }
-
- override protected def sendRequest(req: FetchRequest) {
-
- def putResult(blockId: String, blockSize: Long, blockData: ByteBuf) {
- val fetchResult = new FetchResult(blockId, blockSize,
- () => dataDeserialize(blockId, blockData.nioBuffer, serializer))
- results.put(fetchResult)
- }
-
- logDebug("Sending request for %d blocks (%s) from %s".format(
- req.blocks.size, Utils.bytesToString(req.size), req.address.host))
- val cmId = new ConnectionManagerId(req.address.host, req.address.nettyPort)
- val cpier = new ShuffleCopier
- cpier.getBlocks(cmId, req.blocks, putResult)
- logDebug("Sent request for remote blocks " + req.blocks + " from " + req.address.host )
- }
-
- private var copiers: List[_ <: Thread] = null
-
- override def initialize() {
- // Split Local Remote Blocks and set numBlocksToFetch
- val remoteRequests = splitLocalRemoteBlocks()
- // Add the remote requests into our queue in a random order
- for (request <- Utils.randomize(remoteRequests)) {
- fetchRequestsSync.put(request)
- }
-
- copiers = startCopiers(System.getProperty("spark.shuffle.copier.threads", "6").toInt)
- logInfo("Started " + fetchRequestsSync.size + " remote gets in " +
- Utils.getUsedTimeMs(startTime))
-
- // Get Local Blocks
- startTime = System.currentTimeMillis
- getLocalBlocks()
- logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
- }
-
- override def next(): (String, Option[Iterator[Any]]) = {
- resultsGotten += 1
- val result = results.take()
- // If all the results has been retrieved, copiers will exit automatically
- (result.blockId, if (result.failed) None else Some(result.deserialize()))
- }
- }
- // End of NettyBlockFetcherIterator
-}