You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:19 UTC

[35/69] [abbrv] [partial] Initial work to rename package to org.apache.spark

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
new file mode 100644
index 0000000..5b07933
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.Properties
+
+import org.apache.spark.scheduler.cluster.TaskInfo
+import scala.collection.mutable.Map
+
+import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
+
+/**
+ * Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue
+ * architecture where any thread can post an event (e.g. a task finishing or a new job being
+ * submitted) but there is a single "logic" thread that reads these events and takes decisions.
+ * This greatly simplifies synchronization.
+ */
+private[spark] sealed trait DAGSchedulerEvent
+
+private[spark] case class JobSubmitted(
+    finalRDD: RDD[_],
+    func: (TaskContext, Iterator[_]) => _,
+    partitions: Array[Int],
+    allowLocal: Boolean,
+    callSite: String,
+    listener: JobListener,
+    properties: Properties = null)
+  extends DAGSchedulerEvent
+
+private[spark] case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
+
+private[spark] case class CompletionEvent(
+    task: Task[_],
+    reason: TaskEndReason,
+    result: Any,
+    accumUpdates: Map[Long, Any],
+    taskInfo: TaskInfo,
+    taskMetrics: TaskMetrics)
+  extends DAGSchedulerEvent
+
+private[spark] case class ExecutorGained(execId: String, host: String) extends DAGSchedulerEvent
+
+private[spark] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
+
+private[spark] case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
+
+private[spark] case object StopDAGScheduler extends DAGSchedulerEvent

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
new file mode 100644
index 0000000..ce0dc90
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -0,0 +1,30 @@
+package org.apache.spark.scheduler
+
+import com.codahale.metrics.{Gauge,MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+
+private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
+  val metricRegistry = new MetricRegistry()
+  val sourceName = "DAGScheduler"
+
+  metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
+    override def getValue: Int = dagScheduler.failed.size
+  })
+
+  metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] {
+    override def getValue: Int = dagScheduler.running.size
+  })
+
+  metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] {
+    override def getValue: Int = dagScheduler.waiting.size
+  })
+
+  metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
+    override def getValue: Int = dagScheduler.nextJobId.get()
+  })
+
+  metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
+    override def getValue: Int = dagScheduler.activeJobs.size
+  })
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
new file mode 100644
index 0000000..370ccd1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.{Logging, SparkEnv}
+import scala.collection.immutable.Set
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.conf.Configuration
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
+
+
+/**
+ * Parses and holds information about inputFormat (and files) specified as a parameter.
+ */
+class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Class[_], 
+                      val path: String) extends Logging {
+
+  var mapreduceInputFormat: Boolean = false
+  var mapredInputFormat: Boolean = false
+
+  validate()
+
+  override def toString(): String = {
+    "InputFormatInfo " + super.toString + " .. inputFormatClazz " + inputFormatClazz + ", path : " + path
+  }
+
+  override def hashCode(): Int = {
+    var hashCode = inputFormatClazz.hashCode
+    hashCode = hashCode * 31 + path.hashCode
+    hashCode
+  }
+
+  // Since we are not doing canonicalization of path, this can be wrong : like relative vs absolute path
+  // .. which is fine, this is best case effort to remove duplicates - right ?
+  override def equals(other: Any): Boolean = other match {
+    case that: InputFormatInfo => {
+      // not checking config - that should be fine, right ?
+      this.inputFormatClazz == that.inputFormatClazz &&
+        this.path == that.path
+    }
+    case _ => false
+  }
+
+  private def validate() {
+    logDebug("validate InputFormatInfo : " + inputFormatClazz + ", path  " + path)
+
+    try {
+      if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]].isAssignableFrom(inputFormatClazz)) {
+        logDebug("inputformat is from mapreduce package")
+        mapreduceInputFormat = true
+      }
+      else if (classOf[org.apache.hadoop.mapred.InputFormat[_, _]].isAssignableFrom(inputFormatClazz)) {
+        logDebug("inputformat is from mapred package")
+        mapredInputFormat = true
+      }
+      else {
+        throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz +
+          " is NOT a supported input format ? does not implement either of the supported hadoop api's")
+      }
+    }
+    catch {
+      case e: ClassNotFoundException => {
+        throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz + " cannot be found ?", e)
+      }
+    }
+  }
+
+
+  // This method does not expect failures, since validate has already passed ...
+  private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = {
+    val env = SparkEnv.get
+    val conf = new JobConf(configuration)
+    env.hadoop.addCredentials(conf)
+    FileInputFormat.setInputPaths(conf, path)
+
+    val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
+      ReflectionUtils.newInstance(inputFormatClazz.asInstanceOf[Class[_]], conf).asInstanceOf[
+        org.apache.hadoop.mapreduce.InputFormat[_, _]]
+    val job = new Job(conf)
+
+    val retval = new ArrayBuffer[SplitInfo]()
+    val list = instance.getSplits(job)
+    for (split <- list) {
+      retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, split)
+    }
+
+    return retval.toSet
+  }
+
+  // This method does not expect failures, since validate has already passed ...
+  private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = {
+    val env = SparkEnv.get
+    val jobConf = new JobConf(configuration)
+    env.hadoop.addCredentials(jobConf)
+    FileInputFormat.setInputPaths(jobConf, path)
+
+    val instance: org.apache.hadoop.mapred.InputFormat[_, _] =
+      ReflectionUtils.newInstance(inputFormatClazz.asInstanceOf[Class[_]], jobConf).asInstanceOf[
+        org.apache.hadoop.mapred.InputFormat[_, _]]
+
+    val retval = new ArrayBuffer[SplitInfo]()
+    instance.getSplits(jobConf, jobConf.getNumMapTasks()).foreach(
+        elem => retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, elem)
+    )
+
+    return retval.toSet
+   }
+
+  private def findPreferredLocations(): Set[SplitInfo] = {
+    logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat + 
+      ", inputFormatClazz : " + inputFormatClazz)
+    if (mapreduceInputFormat) {
+      return prefLocsFromMapreduceInputFormat()
+    }
+    else {
+      assert(mapredInputFormat)
+      return prefLocsFromMapredInputFormat()
+    }
+  }
+}
+
+
+
+
+object InputFormatInfo {
+  /**
+    Computes the preferred locations based on input(s) and returned a location to block map.
+    Typical use of this method for allocation would follow some algo like this 
+    (which is what we currently do in YARN branch) :
+    a) For each host, count number of splits hosted on that host.
+    b) Decrement the currently allocated containers on that host.
+    c) Compute rack info for each host and update rack -> count map based on (b).
+    d) Allocate nodes based on (c)
+    e) On the allocation result, ensure that we dont allocate "too many" jobs on a single node 
+       (even if data locality on that is very high) : this is to prevent fragility of job if a single 
+       (or small set of) hosts go down.
+
+    go to (a) until required nodes are allocated.
+
+    If a node 'dies', follow same procedure.
+
+    PS: I know the wording here is weird, hopefully it makes some sense !
+  */
+  def computePreferredLocations(formats: Seq[InputFormatInfo]): HashMap[String, HashSet[SplitInfo]] = {
+
+    val nodeToSplit = new HashMap[String, HashSet[SplitInfo]]
+    for (inputSplit <- formats) {
+      val splits = inputSplit.findPreferredLocations()
+
+      for (split <- splits){
+        val location = split.hostLocation
+        val set = nodeToSplit.getOrElseUpdate(location, new HashSet[SplitInfo])
+        set += split
+      }
+    }
+
+    nodeToSplit
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/JobListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobListener.scala b/core/src/main/scala/org/apache/spark/scheduler/JobListener.scala
new file mode 100644
index 0000000..50c2b9a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobListener.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * Interface used to listen for job completion or failure events after submitting a job to the
+ * DAGScheduler. The listener is notified each time a task succeeds, as well as if the whole
+ * job fails (and no further taskSucceeded events will happen).
+ */
+private[spark] trait JobListener {
+  def taskSucceeded(index: Int, result: Any)
+  def jobFailed(exception: Exception)
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
new file mode 100644
index 0000000..98ef4d1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.PrintWriter
+import java.io.File
+import java.io.FileNotFoundException
+import java.text.SimpleDateFormat
+import java.util.{Date, Properties}
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.mutable.{Map, HashMap, ListBuffer}
+import scala.io.Source
+
+import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.cluster.TaskInfo
+
+// Used to record runtime information for each job, including RDD graph 
+// tasks' start/stop shuffle information and information from outside
+
+class JobLogger(val logDirName: String) extends SparkListener with Logging {
+  private val logDir =  
+    if (System.getenv("SPARK_LOG_DIR") != null)  
+      System.getenv("SPARK_LOG_DIR")
+    else 
+      "/tmp/spark"
+  private val jobIDToPrintWriter = new HashMap[Int, PrintWriter] 
+  private val stageIDToJobID = new HashMap[Int, Int]
+  private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]]
+  private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents]
+  
+  createLogDir()
+  def this() = this(String.valueOf(System.currentTimeMillis()))
+  
+  def getLogDir = logDir
+  def getJobIDtoPrintWriter = jobIDToPrintWriter
+  def getStageIDToJobID = stageIDToJobID
+  def getJobIDToStages = jobIDToStages
+  def getEventQueue = eventQueue
+  
+  // Create a folder for log files, the folder's name is the creation time of the jobLogger
+  protected def createLogDir() {
+    val dir = new File(logDir + "/" + logDirName + "/")
+    if (dir.exists()) {
+      return
+    }
+    if (dir.mkdirs() == false) {
+      logError("create log directory error:" + logDir + "/" + logDirName + "/")
+    }
+  }
+  
+  // Create a log file for one job, the file name is the jobID
+  protected def createLogWriter(jobID: Int) {
+    try{
+      val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
+      jobIDToPrintWriter += (jobID -> fileWriter)
+      } catch {
+        case e: FileNotFoundException => e.printStackTrace()
+      }
+  }
+  
+  // Close log file, and clean the stage relationship in stageIDToJobID 
+  protected def closeLogWriter(jobID: Int) = 
+    jobIDToPrintWriter.get(jobID).foreach { fileWriter => 
+      fileWriter.close()
+      jobIDToStages.get(jobID).foreach(_.foreach{ stage => 
+        stageIDToJobID -= stage.id
+      })
+      jobIDToPrintWriter -= jobID
+      jobIDToStages -= jobID
+    }
+  
+  // Write log information to log file, withTime parameter controls whether to recored 
+  // time stamp for the information
+  protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
+    var writeInfo = info
+    if (withTime) {
+      val date = new Date(System.currentTimeMillis())
+      writeInfo = DATE_FORMAT.format(date) + ": " +info
+    }
+    jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
+  }
+  
+  protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) = 
+    stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
+
+  protected def buildJobDep(jobID: Int, stage: Stage) {
+    if (stage.jobId == jobID) {
+      jobIDToStages.get(jobID) match {
+        case Some(stageList) => stageList += stage
+        case None => val stageList = new  ListBuffer[Stage]
+                     stageList += stage
+                     jobIDToStages += (jobID -> stageList)
+      }
+      stageIDToJobID += (stage.id -> jobID)
+      stage.parents.foreach(buildJobDep(jobID, _))
+    }
+  }
+
+  protected def recordStageDep(jobID: Int) {
+    def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
+      var rddList = new ListBuffer[RDD[_]]
+      rddList += rdd
+      rdd.dependencies.foreach{ dep => dep match {
+          case shufDep: ShuffleDependency[_,_] =>
+          case _ => rddList ++= getRddsInStage(dep.rdd)
+        }
+      }
+      rddList
+    }
+    jobIDToStages.get(jobID).foreach {_.foreach { stage => 
+        var depRddDesc: String = ""
+        getRddsInStage(stage.rdd).foreach { rdd => 
+          depRddDesc += rdd.id + ","
+        }
+        var depStageDesc: String = ""
+        stage.parents.foreach { stage => 
+          depStageDesc += "(" + stage.id + "," + stage.shuffleDep.get.shuffleId + ")"
+        }
+        jobLogInfo(jobID, "STAGE_ID=" + stage.id + " RDD_DEP=(" + 
+                   depRddDesc.substring(0, depRddDesc.length - 1) + ")" + 
+                   " STAGE_DEP=" + depStageDesc, false)
+      }
+    }
+  }
+  
+  // Generate indents and convert to String
+  protected def indentString(indent: Int) = {
+    val sb = new StringBuilder()
+    for (i <- 1 to indent) {
+      sb.append(" ")
+    }
+    sb.toString()
+  }
+  
+  protected def getRddName(rdd: RDD[_]) = {
+    var rddName = rdd.getClass.getName
+    if (rdd.name != null) {
+      rddName = rdd.name 
+    }
+    rddName
+  }
+  
+  protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
+    val rddInfo = "RDD_ID=" + rdd.id + "(" + getRddName(rdd) + "," + rdd.generator + ")"
+    jobLogInfo(jobID, indentString(indent) + rddInfo, false)
+    rdd.dependencies.foreach{ dep => dep match {
+        case shufDep: ShuffleDependency[_,_] => 
+          val depInfo = "SHUFFLE_ID=" + shufDep.shuffleId
+          jobLogInfo(jobID, indentString(indent + 1) + depInfo, false)
+        case _ => recordRddInStageGraph(jobID, dep.rdd, indent + 1)
+      }
+    }
+  }
+  
+  protected def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) {
+    var stageInfo: String = ""
+    if (stage.isShuffleMap) {
+      stageInfo = "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + 
+                  stage.shuffleDep.get.shuffleId
+    }else{
+      stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE"
+    }
+    if (stage.jobId == jobID) {
+      jobLogInfo(jobID, indentString(indent) + stageInfo, false)
+      recordRddInStageGraph(jobID, stage.rdd, indent)
+      stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2))
+    } else
+      jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false)
+  }
+  
+  // Record task metrics into job log files
+  protected def recordTaskMetrics(stageID: Int, status: String, 
+                                taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
+    val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + 
+               " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime + 
+               " EXECUTOR_ID=" + taskInfo.executorId +  " HOST=" + taskMetrics.hostname
+    val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
+    val readMetrics = 
+      taskMetrics.shuffleReadMetrics match {
+        case Some(metrics) => 
+          " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime + 
+          " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + 
+          " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + 
+          " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + 
+          " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + 
+          " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + 
+          " REMOTE_BYTES_READ=" + metrics.remoteBytesRead
+        case None => ""
+      }
+    val writeMetrics = 
+      taskMetrics.shuffleWriteMetrics match {
+        case Some(metrics) => 
+          " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
+        case None => ""
+      }
+    stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
+  }
+  
+  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
+    stageLogInfo(
+      stageSubmitted.stage.id,
+      "STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
+        stageSubmitted.stage.id, stageSubmitted.taskSize))
+  }
+  
+  override def onStageCompleted(stageCompleted: StageCompleted) {
+    stageLogInfo(
+      stageCompleted.stageInfo.stage.id,
+      "STAGE_ID=%d STATUS=COMPLETED".format(stageCompleted.stageInfo.stage.id))
+    
+  }
+
+  override def onTaskStart(taskStart: SparkListenerTaskStart) { }
+
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+    val task = taskEnd.task
+    val taskInfo = taskEnd.taskInfo
+    var taskStatus = ""
+    task match {
+      case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
+      case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
+    }
+    taskEnd.reason match {
+      case Success => taskStatus += " STATUS=SUCCESS"
+        recordTaskMetrics(task.stageId, taskStatus, taskInfo, taskEnd.taskMetrics)
+      case Resubmitted => 
+        taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId + 
+                      " STAGE_ID=" + task.stageId
+        stageLogInfo(task.stageId, taskStatus)
+      case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => 
+        taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" + 
+                      task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" + 
+                      mapId + " REDUCE_ID=" + reduceId
+        stageLogInfo(task.stageId, taskStatus)
+      case OtherFailure(message) => 
+        taskStatus += " STATUS=FAILURE TID=" + taskInfo.taskId + 
+                      " STAGE_ID=" + task.stageId + " INFO=" + message
+        stageLogInfo(task.stageId, taskStatus)
+      case _ =>
+    }
+  }
+  
+  override def onJobEnd(jobEnd: SparkListenerJobEnd) {
+    val job = jobEnd.job
+    var info = "JOB_ID=" + job.jobId
+    jobEnd.jobResult match {
+      case JobSucceeded => info += " STATUS=SUCCESS"
+      case JobFailed(exception, _) =>
+        info += " STATUS=FAILED REASON="
+        exception.getMessage.split("\\s+").foreach(info += _ + "_")
+      case _ =>
+    }
+    jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase)
+    closeLogWriter(job.jobId)
+  }
+
+  protected def recordJobProperties(jobID: Int, properties: Properties) {
+    if(properties != null) {
+      val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
+      jobLogInfo(jobID, description, false)
+    }
+  }
+
+  override def onJobStart(jobStart: SparkListenerJobStart) {
+    val job = jobStart.job
+    val properties = jobStart.properties
+    createLogWriter(job.jobId)
+    recordJobProperties(job.jobId, properties)
+    buildJobDep(job.jobId, job.finalStage)
+    recordStageDep(job.jobId)
+    recordStageDepGraph(job.jobId, job.finalStage)
+    jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
new file mode 100644
index 0000000..c381348
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * A result of a job in the DAGScheduler.
+ */
+private[spark] sealed trait JobResult
+
+private[spark] case object JobSucceeded extends JobResult
+private[spark] case class JobFailed(exception: Exception, failedStage: Option[Stage]) extends JobResult

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
new file mode 100644
index 0000000..200d881
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their
+ * results to the given handler function.
+ */
+private[spark] class JobWaiter[T](totalTasks: Int, resultHandler: (Int, T) => Unit)
+  extends JobListener {
+
+  private var finishedTasks = 0
+
+  private var jobFinished = false          // Is the job as a whole finished (succeeded or failed)?
+  private var jobResult: JobResult = null  // If the job is finished, this will be its result
+
+  override def taskSucceeded(index: Int, result: Any) {
+    synchronized {
+      if (jobFinished) {
+        throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
+      }
+      resultHandler(index, result.asInstanceOf[T])
+      finishedTasks += 1
+      if (finishedTasks == totalTasks) {
+        jobFinished = true
+        jobResult = JobSucceeded
+        this.notifyAll()
+      }
+    }
+  }
+
+  override def jobFailed(exception: Exception) {
+    synchronized {
+      if (jobFinished) {
+        throw new UnsupportedOperationException("jobFailed() called on a finished JobWaiter")
+      }
+      jobFinished = true
+      jobResult = JobFailed(exception, None)
+      this.notifyAll()
+    }
+  }
+
+  def awaitResult(): JobResult = synchronized {
+    while (!jobFinished) {
+      this.wait()
+    }
+    return jobResult
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
new file mode 100644
index 0000000..1c61687
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.storage.BlockManagerId
+import java.io.{ObjectOutput, ObjectInput, Externalizable}
+
+/**
+ * Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
+ * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
+ * The map output sizes are compressed using MapOutputTracker.compressSize.
+ */
+private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
+  extends Externalizable {
+
+  def this() = this(null, null)  // For deserialization only
+
+  def writeExternal(out: ObjectOutput) {
+    location.writeExternal(out)
+    out.writeInt(compressedSizes.length)
+    out.write(compressedSizes)
+  }
+
+  def readExternal(in: ObjectInput) {
+    location = BlockManagerId(in)
+    compressedSizes = new Array[Byte](in.readInt())
+    in.readFully(compressedSizes)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
new file mode 100644
index 0000000..2f157cc
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark._
+import java.io._
+import util.{MetadataCleaner, TimeStampedHashMap}
+import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+
+private[spark] object ResultTask {
+
+  // A simple map between the stage id to the serialized byte array of a task.
+  // Served as a cache for task serialization because serialization can be
+  // expensive on the master node if it needs to launch thousands of tasks.
+  val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
+
+  val metadataCleaner = new MetadataCleaner("ResultTask", serializedInfoCache.clearOldValues)
+
+  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = {
+    synchronized {
+      val old = serializedInfoCache.get(stageId).orNull
+      if (old != null) {
+        return old
+      } else {
+        val out = new ByteArrayOutputStream
+        val ser = SparkEnv.get.closureSerializer.newInstance
+        val objOut = ser.serializeStream(new GZIPOutputStream(out))
+        objOut.writeObject(rdd)
+        objOut.writeObject(func)
+        objOut.close()
+        val bytes = out.toByteArray
+        serializedInfoCache.put(stageId, bytes)
+        return bytes
+      }
+    }
+  }
+
+  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) => _) = {
+    val loader = Thread.currentThread.getContextClassLoader
+    val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+    val ser = SparkEnv.get.closureSerializer.newInstance
+    val objIn = ser.deserializeStream(in)
+    val rdd = objIn.readObject().asInstanceOf[RDD[_]]
+    val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) => _]
+    return (rdd, func)
+  }
+
+  def clearCache() {
+    synchronized {
+      serializedInfoCache.clear()
+    }
+  }
+}
+
+
+private[spark] class ResultTask[T, U](
+    stageId: Int,
+    var rdd: RDD[T],
+    var func: (TaskContext, Iterator[T]) => U,
+    var partition: Int,
+    @transient locs: Seq[TaskLocation],
+    val outputId: Int)
+  extends Task[U](stageId) with Externalizable {
+
+  def this() = this(0, null, null, 0, null, 0)
+
+  var split = if (rdd == null) {
+    null
+  } else {
+    rdd.partitions(partition)
+  }
+
+  @transient private val preferredLocs: Seq[TaskLocation] = {
+    if (locs == null) Nil else locs.toSet.toSeq
+  }
+
+  override def run(attemptId: Long): U = {
+    val context = new TaskContext(stageId, partition, attemptId)
+    metrics = Some(context.taskMetrics)
+    try {
+      func(context, rdd.iterator(split, context))
+    } finally {
+      context.executeOnCompleteCallbacks()
+    }
+  }
+
+  override def preferredLocations: Seq[TaskLocation] = preferredLocs
+
+  override def toString = "ResultTask(" + stageId + ", " + partition + ")"
+
+  override def writeExternal(out: ObjectOutput) {
+    RDDCheckpointData.synchronized {
+      split = rdd.partitions(partition)
+      out.writeInt(stageId)
+      val bytes = ResultTask.serializeInfo(
+        stageId, rdd, func.asInstanceOf[(TaskContext, Iterator[_]) => _])
+      out.writeInt(bytes.length)
+      out.write(bytes)
+      out.writeInt(partition)
+      out.writeInt(outputId)
+      out.writeLong(epoch)
+      out.writeObject(split)
+    }
+  }
+
+  override def readExternal(in: ObjectInput) {
+    val stageId = in.readInt()
+    val numBytes = in.readInt()
+    val bytes = new Array[Byte](numBytes)
+    in.readFully(bytes)
+    val (rdd_, func_) = ResultTask.deserializeInfo(stageId, bytes)
+    rdd = rdd_.asInstanceOf[RDD[T]]
+    func = func_.asInstanceOf[(TaskContext, Iterator[T]) => U]
+    partition = in.readInt()
+    val outputId = in.readInt()
+    epoch = in.readLong()
+    split = in.readObject().asInstanceOf[Partition]
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
new file mode 100644
index 0000000..ca716b4
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io._
+import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark._
+import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.storage._
+import org.apache.spark.util.{TimeStampedHashMap, MetadataCleaner}
+
+
+private[spark] object ShuffleMapTask {
+
+  // A simple map between the stage id to the serialized byte array of a task.
+  // Served as a cache for task serialization because serialization can be
+  // expensive on the master node if it needs to launch thousands of tasks.
+  val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
+
+  val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.clearOldValues)
+
+  def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = {
+    synchronized {
+      val old = serializedInfoCache.get(stageId).orNull
+      if (old != null) {
+        return old
+      } else {
+        val out = new ByteArrayOutputStream
+        val ser = SparkEnv.get.closureSerializer.newInstance()
+        val objOut = ser.serializeStream(new GZIPOutputStream(out))
+        objOut.writeObject(rdd)
+        objOut.writeObject(dep)
+        objOut.close()
+        val bytes = out.toByteArray
+        serializedInfoCache.put(stageId, bytes)
+        return bytes
+      }
+    }
+  }
+
+  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_]) = {
+    synchronized {
+      val loader = Thread.currentThread.getContextClassLoader
+      val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+      val ser = SparkEnv.get.closureSerializer.newInstance()
+      val objIn = ser.deserializeStream(in)
+      val rdd = objIn.readObject().asInstanceOf[RDD[_]]
+      val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]]
+      return (rdd, dep)
+    }
+  }
+
+  // Since both the JarSet and FileSet have the same format this is used for both.
+  def deserializeFileSet(bytes: Array[Byte]) : HashMap[String, Long] = {
+    val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+    val objIn = new ObjectInputStream(in)
+    val set = objIn.readObject().asInstanceOf[Array[(String, Long)]].toMap
+    return (HashMap(set.toSeq: _*))
+  }
+
+  def clearCache() {
+    synchronized {
+      serializedInfoCache.clear()
+    }
+  }
+}
+
+private[spark] class ShuffleMapTask(
+    stageId: Int,
+    var rdd: RDD[_],
+    var dep: ShuffleDependency[_,_],
+    var partition: Int,
+    @transient private var locs: Seq[TaskLocation])
+  extends Task[MapStatus](stageId)
+  with Externalizable
+  with Logging {
+
+  protected def this() = this(0, null, null, 0, null)
+
+  @transient private val preferredLocs: Seq[TaskLocation] = {
+    if (locs == null) Nil else locs.toSet.toSeq
+  }
+
+  var split = if (rdd == null) null else rdd.partitions(partition)
+
+  override def writeExternal(out: ObjectOutput) {
+    RDDCheckpointData.synchronized {
+      split = rdd.partitions(partition)
+      out.writeInt(stageId)
+      val bytes = ShuffleMapTask.serializeInfo(stageId, rdd, dep)
+      out.writeInt(bytes.length)
+      out.write(bytes)
+      out.writeInt(partition)
+      out.writeLong(epoch)
+      out.writeObject(split)
+    }
+  }
+
+  override def readExternal(in: ObjectInput) {
+    val stageId = in.readInt()
+    val numBytes = in.readInt()
+    val bytes = new Array[Byte](numBytes)
+    in.readFully(bytes)
+    val (rdd_, dep_) = ShuffleMapTask.deserializeInfo(stageId, bytes)
+    rdd = rdd_
+    dep = dep_
+    partition = in.readInt()
+    epoch = in.readLong()
+    split = in.readObject().asInstanceOf[Partition]
+  }
+
+  override def run(attemptId: Long): MapStatus = {
+    val numOutputSplits = dep.partitioner.numPartitions
+
+    val taskContext = new TaskContext(stageId, partition, attemptId)
+    metrics = Some(taskContext.taskMetrics)
+
+    val blockManager = SparkEnv.get.blockManager
+    var shuffle: ShuffleBlocks = null
+    var buckets: ShuffleWriterGroup = null
+
+    try {
+      // Obtain all the block writers for shuffle blocks.
+      val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
+      shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)
+      buckets = shuffle.acquireWriters(partition)
+
+      // Write the map output to its associated buckets.
+      for (elem <- rdd.iterator(split, taskContext)) {
+        val pair = elem.asInstanceOf[Product2[Any, Any]]
+        val bucketId = dep.partitioner.getPartition(pair._1)
+        buckets.writers(bucketId).write(pair)
+      }
+
+      // Commit the writes. Get the size of each bucket block (total block size).
+      var totalBytes = 0L
+      val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter =>
+        writer.commit()
+        writer.close()
+        val size = writer.size()
+        totalBytes += size
+        MapOutputTracker.compressSize(size)
+      }
+
+      // Update shuffle metrics.
+      val shuffleMetrics = new ShuffleWriteMetrics
+      shuffleMetrics.shuffleBytesWritten = totalBytes
+      metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
+
+      return new MapStatus(blockManager.blockManagerId, compressedSizes)
+    } catch { case e: Exception =>
+      // If there is an exception from running the task, revert the partial writes
+      // and throw the exception upstream to Spark.
+      if (buckets != null) {
+        buckets.writers.foreach(_.revertPartialWrites())
+      }
+      throw e
+    } finally {
+      // Release the writers back to the shuffle block manager.
+      if (shuffle != null && buckets != null) {
+        shuffle.releaseWriters(buckets)
+      }
+      // Execute the callbacks on task completion.
+      taskContext.executeOnCompleteCallbacks()
+    }
+  }
+
+  override def preferredLocations: Seq[TaskLocation] = preferredLocs
+
+  override def toString = "ShuffleMapTask(%d, %d)".format(stageId, partition)
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
new file mode 100644
index 0000000..3504424
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.Properties
+import org.apache.spark.scheduler.cluster.TaskInfo
+import org.apache.spark.util.Distribution
+import org.apache.spark.{Logging, SparkContext, TaskEndReason, Utils}
+import org.apache.spark.executor.TaskMetrics
+
+sealed trait SparkListenerEvents
+
+case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties)
+     extends SparkListenerEvents
+
+case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
+
+case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
+
+case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
+     taskMetrics: TaskMetrics) extends SparkListenerEvents
+
+case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null)
+     extends SparkListenerEvents
+
+case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
+     extends SparkListenerEvents
+
+trait SparkListener {
+  /**
+   * Called when a stage is completed, with information on the completed stage
+   */
+  def onStageCompleted(stageCompleted: StageCompleted) { }
+
+  /**
+   * Called when a stage is submitted
+   */
+  def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
+
+  /**
+   * Called when a task starts
+   */
+  def onTaskStart(taskEnd: SparkListenerTaskStart) { }
+
+  /**
+   * Called when a task ends
+   */
+  def onTaskEnd(taskEnd: SparkListenerTaskEnd) { }
+
+  /**
+   * Called when a job starts
+   */
+  def onJobStart(jobStart: SparkListenerJobStart) { }
+
+  /**
+   * Called when a job ends
+   */
+  def onJobEnd(jobEnd: SparkListenerJobEnd) { }
+
+}
+
+/**
+ * Simple SparkListener that logs a few summary statistics when each stage completes
+ */
+class StatsReportListener extends SparkListener with Logging {
+  override def onStageCompleted(stageCompleted: StageCompleted) {
+    import org.apache.spark.scheduler.StatsReportListener._
+    implicit val sc = stageCompleted
+    this.logInfo("Finished stage: " + stageCompleted.stageInfo)
+    showMillisDistribution("task runtime:", (info, _) => Some(info.duration))
+
+    //shuffle write
+    showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten})
+
+    //fetch & io
+    showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime})
+    showBytesDistribution("remote bytes read:", (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead})
+    showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize))
+
+    //runtime breakdown
+
+    val runtimePcts = stageCompleted.stageInfo.taskInfos.map{
+      case (info, metrics) => RuntimePercentage(info.duration, metrics)
+    }
+    showDistribution("executor (non-fetch) time pct: ", Distribution(runtimePcts.map{_.executorPct * 100}), "%2.0f %%")
+    showDistribution("fetch wait time pct: ", Distribution(runtimePcts.flatMap{_.fetchPct.map{_ * 100}}), "%2.0f %%")
+    showDistribution("other time pct: ", Distribution(runtimePcts.map{_.other * 100}), "%2.0f %%")
+  }
+
+}
+
+object StatsReportListener extends Logging {
+
+  //for profiling, the extremes are more interesting
+  val percentiles = Array[Int](0,5,10,25,50,75,90,95,100)
+  val probabilities = percentiles.map{_ / 100.0}
+  val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
+
+  def extractDoubleDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Double]): Option[Distribution] = {
+    Distribution(stage.stageInfo.taskInfos.flatMap{
+      case ((info,metric)) => getMetric(info, metric)})
+  }
+
+  //is there some way to setup the types that I can get rid of this completely?
+  def extractLongDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Long]): Option[Distribution] = {
+    extractDoubleDistribution(stage, (info, metric) => getMetric(info,metric).map{_.toDouble})
+  }
+
+  def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
+    val stats = d.statCounter
+    logInfo(heading + stats)
+    val quantiles = d.getQuantiles(probabilities).map{formatNumber}
+    logInfo(percentilesHeader)
+    logInfo("\t" + quantiles.mkString("\t"))
+  }
+
+  def showDistribution(heading: String, dOpt: Option[Distribution], formatNumber: Double => String) {
+    dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
+  }
+
+  def showDistribution(heading: String, dOpt: Option[Distribution], format:String) {
+    def f(d:Double) = format.format(d)
+    showDistribution(heading, dOpt, f _)
+  }
+
+  def showDistribution(heading:String, format: String, getMetric: (TaskInfo,TaskMetrics) => Option[Double])
+    (implicit stage: StageCompleted) {
+    showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
+  }
+
+  def showBytesDistribution(heading:String, getMetric: (TaskInfo,TaskMetrics) => Option[Long])
+    (implicit stage: StageCompleted) {
+    showBytesDistribution(heading, extractLongDistribution(stage, getMetric))
+  }
+
+  def showBytesDistribution(heading: String, dOpt: Option[Distribution]) {
+    dOpt.foreach{dist => showBytesDistribution(heading, dist)}
+  }
+
+  def showBytesDistribution(heading: String, dist: Distribution) {
+    showDistribution(heading, dist, (d => Utils.bytesToString(d.toLong)): Double => String)
+  }
+
+  def showMillisDistribution(heading: String, dOpt: Option[Distribution]) {
+    showDistribution(heading, dOpt, (d => StatsReportListener.millisToString(d.toLong)): Double => String)
+  }
+
+  def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long])
+    (implicit stage: StageCompleted) {
+    showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
+  }
+
+
+
+  val seconds = 1000L
+  val minutes = seconds * 60
+  val hours = minutes * 60
+
+  /**
+   * reformat a time interval in milliseconds to a prettier format for output
+   */
+  def millisToString(ms: Long) = {
+    val (size, units) =
+      if (ms > hours) {
+        (ms.toDouble / hours, "hours")
+      } else if (ms > minutes) {
+        (ms.toDouble / minutes, "min")
+      } else if (ms > seconds) {
+        (ms.toDouble / seconds, "s")
+      } else {
+        (ms.toDouble, "ms")
+      }
+    "%.1f %s".format(size, units)
+  }
+}
+
+
+
+case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
+object RuntimePercentage {
+  def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
+    val denom = totalTime.toDouble
+    val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime}
+    val fetch = fetchTime.map{_ / denom}
+    val exec = (metrics.executorRunTime - fetchTime.getOrElse(0l)) / denom
+    val other = 1.0 - (exec + fetch.getOrElse(0d))
+    RuntimePercentage(exec, fetch, other)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
new file mode 100644
index 0000000..a65e1ec
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
+
+import org.apache.spark.Logging
+
+/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
+private[spark] class SparkListenerBus() extends Logging {
+  private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener]
+
+  /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
+   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
+  private val EVENT_QUEUE_CAPACITY = 10000 
+  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
+  private var queueFullErrorMessageLogged = false
+
+  new Thread("SparkListenerBus") {
+    setDaemon(true)
+    override def run() {
+      while (true) {
+        val event = eventQueue.take
+        event match {
+          case stageSubmitted: SparkListenerStageSubmitted =>
+            sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
+          case stageCompleted: StageCompleted =>
+            sparkListeners.foreach(_.onStageCompleted(stageCompleted))
+          case jobStart: SparkListenerJobStart =>
+            sparkListeners.foreach(_.onJobStart(jobStart))
+          case jobEnd: SparkListenerJobEnd =>
+            sparkListeners.foreach(_.onJobEnd(jobEnd))
+          case taskStart: SparkListenerTaskStart =>
+            sparkListeners.foreach(_.onTaskStart(taskStart))
+          case taskEnd: SparkListenerTaskEnd =>
+            sparkListeners.foreach(_.onTaskEnd(taskEnd))
+          case _ =>
+        }
+      }
+    }
+  }.start()
+
+  def addListener(listener: SparkListener) {
+    sparkListeners += listener
+  }
+
+  def post(event: SparkListenerEvents) {
+    val eventAdded = eventQueue.offer(event)
+    if (!eventAdded && !queueFullErrorMessageLogged) {
+      logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
+        "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
+        "rate at which tasks are being started by the scheduler.")
+      queueFullErrorMessageLogged = true
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala
new file mode 100644
index 0000000..5b40a3e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import collection.mutable.ArrayBuffer
+
+// information about a specific split instance : handles both split instances.
+// So that we do not need to worry about the differences.
+class SplitInfo(val inputFormatClazz: Class[_], val hostLocation: String, val path: String,
+                val length: Long, val underlyingSplit: Any) {
+  override def toString(): String = {
+    "SplitInfo " + super.toString + " .. inputFormatClazz " + inputFormatClazz +
+      ", hostLocation : " + hostLocation + ", path : " + path +
+      ", length : " + length + ", underlyingSplit " + underlyingSplit
+  }
+
+  override def hashCode(): Int = {
+    var hashCode = inputFormatClazz.hashCode
+    hashCode = hashCode * 31 + hostLocation.hashCode
+    hashCode = hashCode * 31 + path.hashCode
+    // ignore overflow ? It is hashcode anyway !
+    hashCode = hashCode * 31 + (length & 0x7fffffff).toInt
+    hashCode
+  }
+
+  // This is practically useless since most of the Split impl's dont seem to implement equals :-(
+  // So unless there is identity equality between underlyingSplits, it will always fail even if it
+  // is pointing to same block.
+  override def equals(other: Any): Boolean = other match {
+    case that: SplitInfo => {
+      this.hostLocation == that.hostLocation &&
+        this.inputFormatClazz == that.inputFormatClazz &&
+        this.path == that.path &&
+        this.length == that.length &&
+        // other split specific checks (like start for FileSplit)
+        this.underlyingSplit == that.underlyingSplit
+    }
+    case _ => false
+  }
+}
+
+object SplitInfo {
+
+  def toSplitInfo(inputFormatClazz: Class[_], path: String,
+                  mapredSplit: org.apache.hadoop.mapred.InputSplit): Seq[SplitInfo] = {
+    val retval = new ArrayBuffer[SplitInfo]()
+    val length = mapredSplit.getLength
+    for (host <- mapredSplit.getLocations) {
+      retval += new SplitInfo(inputFormatClazz, host, path, length, mapredSplit)
+    }
+    retval
+  }
+
+  def toSplitInfo(inputFormatClazz: Class[_], path: String,
+                  mapreduceSplit: org.apache.hadoop.mapreduce.InputSplit): Seq[SplitInfo] = {
+    val retval = new ArrayBuffer[SplitInfo]()
+    val length = mapreduceSplit.getLength
+    for (host <- mapreduceSplit.getLocations) {
+      retval += new SplitInfo(inputFormatClazz, host, path, length, mapreduceSplit)
+    }
+    retval
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
new file mode 100644
index 0000000..87b1fe4
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.net.URI
+
+import org.apache.spark._
+import org.apache.spark.storage.BlockManagerId
+
+/**
+ * A stage is a set of independent tasks all computing the same function that need to run as part
+ * of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
+ * by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
+ * DAGScheduler runs these stages in topological order.
+ *
+ * Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
+ * another stage, or a result stage, in which case its tasks directly compute the action that
+ * initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
+ * that each output partition is on.
+ *
+ * Each Stage also has a jobId, identifying the job that first submitted the stage.  When FIFO
+ * scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
+ * faster on failure.
+ */
+private[spark] class Stage(
+    val id: Int,
+    val rdd: RDD[_],
+    val shuffleDep: Option[ShuffleDependency[_,_]],  // Output shuffle if stage is a map stage
+    val parents: List[Stage],
+    val jobId: Int,
+    callSite: Option[String])
+  extends Logging {
+
+  val isShuffleMap = shuffleDep != None
+  val numPartitions = rdd.partitions.size
+  val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
+  var numAvailableOutputs = 0
+
+  /** When first task was submitted to scheduler. */
+  var submissionTime: Option[Long] = None
+  var completionTime: Option[Long] = None
+
+  private var nextAttemptId = 0
+
+  def isAvailable: Boolean = {
+    if (!isShuffleMap) {
+      true
+    } else {
+      numAvailableOutputs == numPartitions
+    }
+  }
+
+  def addOutputLoc(partition: Int, status: MapStatus) {
+    val prevList = outputLocs(partition)
+    outputLocs(partition) = status :: prevList
+    if (prevList == Nil)
+      numAvailableOutputs += 1
+  }
+
+  def removeOutputLoc(partition: Int, bmAddress: BlockManagerId) {
+    val prevList = outputLocs(partition)
+    val newList = prevList.filterNot(_.location == bmAddress)
+    outputLocs(partition) = newList
+    if (prevList != Nil && newList == Nil) {
+      numAvailableOutputs -= 1
+    }
+  }
+
+  def removeOutputsOnExecutor(execId: String) {
+    var becameUnavailable = false
+    for (partition <- 0 until numPartitions) {
+      val prevList = outputLocs(partition)
+      val newList = prevList.filterNot(_.location.executorId == execId)
+      outputLocs(partition) = newList
+      if (prevList != Nil && newList == Nil) {
+        becameUnavailable = true
+        numAvailableOutputs -= 1
+      }
+    }
+    if (becameUnavailable) {
+      logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
+        this, execId, numAvailableOutputs, numPartitions, isAvailable))
+    }
+  }
+
+  def newAttemptId(): Int = {
+    val id = nextAttemptId
+    nextAttemptId += 1
+    return id
+  }
+
+  val name = callSite.getOrElse(rdd.origin)
+
+  override def toString = "Stage " + id
+
+  override def hashCode(): Int = id
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
new file mode 100644
index 0000000..72cb1c9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.scheduler.cluster.TaskInfo
+import scala.collection._
+import org.apache.spark.executor.TaskMetrics
+
+case class StageInfo(
+    val stage: Stage,
+    val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] = mutable.Buffer[(TaskInfo, TaskMetrics)]()
+) {
+  override def toString = stage.rdd.toString
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
new file mode 100644
index 0000000..598d917
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.serializer.SerializerInstance
+import java.io.{DataInputStream, DataOutputStream}
+import java.nio.ByteBuffer
+import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
+import org.apache.spark.util.ByteBufferInputStream
+import scala.collection.mutable.HashMap
+import org.apache.spark.executor.TaskMetrics
+
+/**
+ * A task to execute on a worker node.
+ */
+private[spark] abstract class Task[T](val stageId: Int) extends Serializable {
+  def run(attemptId: Long): T
+  def preferredLocations: Seq[TaskLocation] = Nil
+
+  var epoch: Long = -1   // Map output tracker epoch. Will be set by TaskScheduler.
+
+  var metrics: Option[TaskMetrics] = None
+
+}
+
+/**
+ * Handles transmission of tasks and their dependencies, because this can be slightly tricky. We
+ * need to send the list of JARs and files added to the SparkContext with each task to ensure that
+ * worker nodes find out about it, but we can't make it part of the Task because the user's code in
+ * the task might depend on one of the JARs. Thus we serialize each task as multiple objects, by
+ * first writing out its dependencies.
+ */
+private[spark] object Task {
+  /**
+   * Serialize a task and the current app dependencies (files and JARs added to the SparkContext)
+   */
+  def serializeWithDependencies(
+      task: Task[_],
+      currentFiles: HashMap[String, Long],
+      currentJars: HashMap[String, Long],
+      serializer: SerializerInstance)
+    : ByteBuffer = {
+
+    val out = new FastByteArrayOutputStream(4096)
+    val dataOut = new DataOutputStream(out)
+
+    // Write currentFiles
+    dataOut.writeInt(currentFiles.size)
+    for ((name, timestamp) <- currentFiles) {
+      dataOut.writeUTF(name)
+      dataOut.writeLong(timestamp)
+    }
+
+    // Write currentJars
+    dataOut.writeInt(currentJars.size)
+    for ((name, timestamp) <- currentJars) {
+      dataOut.writeUTF(name)
+      dataOut.writeLong(timestamp)
+    }
+
+    // Write the task itself and finish
+    dataOut.flush()
+    val taskBytes = serializer.serialize(task).array()
+    out.write(taskBytes)
+    out.trim()
+    ByteBuffer.wrap(out.array)
+  }
+
+  /**
+   * Deserialize the list of dependencies in a task serialized with serializeWithDependencies,
+   * and return the task itself as a serialized ByteBuffer. The caller can then update its
+   * ClassLoaders and deserialize the task.
+   *
+   * @return (taskFiles, taskJars, taskBytes)
+   */
+  def deserializeWithDependencies(serializedTask: ByteBuffer)
+    : (HashMap[String, Long], HashMap[String, Long], ByteBuffer) = {
+
+    val in = new ByteBufferInputStream(serializedTask)
+    val dataIn = new DataInputStream(in)
+
+    // Read task's files
+    val taskFiles = new HashMap[String, Long]()
+    val numFiles = dataIn.readInt()
+    for (i <- 0 until numFiles) {
+      taskFiles(dataIn.readUTF()) = dataIn.readLong()
+    }
+
+    // Read task's JARs
+    val taskJars = new HashMap[String, Long]()
+    val numJars = dataIn.readInt()
+    for (i <- 0 until numJars) {
+      taskJars(dataIn.readUTF()) = dataIn.readLong()
+    }
+
+    // Create a sub-buffer for the rest of the data, which is the serialized Task object
+    val subBuffer = serializedTask.slice()  // ByteBufferInputStream will have read just up to task
+    (taskFiles, taskJars, subBuffer)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
new file mode 100644
index 0000000..67c9a67
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * A location where a task should run. This can either be a host or a (host, executorID) pair.
+ * In the latter case, we will prefer to launch the task on that executorID, but our next level
+ * of preference will be executors on the same host if this is not possible.
+ */
+private[spark]
+class TaskLocation private (val host: String, val executorId: Option[String]) extends Serializable {
+  override def toString: String = "TaskLocation(" + host + ", " + executorId + ")"
+}
+
+private[spark] object TaskLocation {
+  def apply(host: String, executorId: String) = new TaskLocation(host, Some(executorId))
+
+  def apply(host: String) = new TaskLocation(host, None)
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
new file mode 100644
index 0000000..776675d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io._
+
+import scala.collection.mutable.Map
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.{Utils, SparkEnv}
+import java.nio.ByteBuffer
+
+// Task result. Also contains updates to accumulator variables.
+// TODO: Use of distributed cache to return result is a hack to get around
+// what seems to be a bug with messages over 60KB in libprocess; fix it
+private[spark]
+class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
+  extends Externalizable
+{
+  def this() = this(null.asInstanceOf[T], null, null)
+
+  override def writeExternal(out: ObjectOutput) {
+
+    val objectSer = SparkEnv.get.serializer.newInstance()
+    val bb = objectSer.serialize(value)
+
+    out.writeInt(bb.remaining())
+    Utils.writeByteBuffer(bb, out)
+
+    out.writeInt(accumUpdates.size)
+    for ((key, value) <- accumUpdates) {
+      out.writeLong(key)
+      out.writeObject(value)
+    }
+    out.writeObject(metrics)
+  }
+
+  override def readExternal(in: ObjectInput) {
+
+    val objectSer = SparkEnv.get.serializer.newInstance()
+
+    val blen = in.readInt()
+    val byteVal = new Array[Byte](blen)
+    in.readFully(byteVal)
+    value = objectSer.deserialize(ByteBuffer.wrap(byteVal))
+
+    val numUpdates = in.readInt
+    if (numUpdates == 0) {
+      accumUpdates = null
+    } else {
+      accumUpdates = Map()
+      for (i <- 0 until numUpdates) {
+        accumUpdates(in.readLong()) = in.readObject()
+      }
+    }
+    metrics = in.readObject().asInstanceOf[TaskMetrics]
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
new file mode 100644
index 0000000..63be8ba
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.scheduler.cluster.Pool
+import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+/**
+ * Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
+ * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
+ * and are responsible for sending the tasks to the cluster, running them, retrying if there
+ * are failures, and mitigating stragglers. They return events to the DAGScheduler through
+ * the TaskSchedulerListener interface.
+ */
+private[spark] trait TaskScheduler {
+
+  def rootPool: Pool
+
+  def schedulingMode: SchedulingMode
+
+  def start(): Unit
+
+  // Invoked after system has successfully initialized (typically in spark context).
+  // Yarn uses this to bootstrap allocation of resources based on preferred locations, wait for slave registerations, etc.
+  def postStartHook() { }
+
+  // Disconnect from the cluster.
+  def stop(): Unit
+
+  // Submit a sequence of tasks to run.
+  def submitTasks(taskSet: TaskSet): Unit
+
+  // Set a listener for upcalls. This is guaranteed to be set before submitTasks is called.
+  def setListener(listener: TaskSchedulerListener): Unit
+
+  // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
+  def defaultParallelism(): Int
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala
new file mode 100644
index 0000000..83be051
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.scheduler.cluster.TaskInfo
+import scala.collection.mutable.Map
+
+import org.apache.spark.TaskEndReason
+import org.apache.spark.executor.TaskMetrics
+
+/**
+ * Interface for getting events back from the TaskScheduler.
+ */
+private[spark] trait TaskSchedulerListener {
+  // A task has started.
+  def taskStarted(task: Task[_], taskInfo: TaskInfo)
+
+  // A task has finished or failed.
+  def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any],
+                taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit
+
+  // A node was added to the cluster.
+  def executorGained(execId: String, host: String): Unit
+
+  // A node was lost from the cluster.
+  def executorLost(execId: String): Unit
+
+  // The TaskScheduler wants to abort an entire task set.
+  def taskSetFailed(taskSet: TaskSet, reason: String): Unit
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
new file mode 100644
index 0000000..c3ad325
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.Properties
+
+/**
+ * A set of tasks submitted together to the low-level TaskScheduler, usually representing
+ * missing partitions of a particular stage.
+ */
+private[spark] class TaskSet(
+    val tasks: Array[Task[_]],
+    val stageId: Int,
+    val attempt: Int,
+    val priority: Int,
+    val properties: Properties) {
+    val id: String = stageId + "." + attempt
+
+  override def toString: String = "TaskSet " + id
+}