You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:02 UTC

[18/69] [abbrv] [partial] Initial work to rename package to org.apache.spark

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
deleted file mode 100644
index 8f1b9b2..0000000
--- a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import spark.{Logging, SparkEnv}
-import scala.collection.immutable.Set
-import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.util.ReflectionUtils
-import org.apache.hadoop.mapreduce.Job
-import org.apache.hadoop.conf.Configuration
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.collection.JavaConversions._
-
-
-/**
- * Parses and holds information about inputFormat (and files) specified as a parameter.
- */
-class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Class[_], 
-                      val path: String) extends Logging {
-
-  var mapreduceInputFormat: Boolean = false
-  var mapredInputFormat: Boolean = false
-
-  validate()
-
-  override def toString(): String = {
-    "InputFormatInfo " + super.toString + " .. inputFormatClazz " + inputFormatClazz + ", path : " + path
-  }
-
-  override def hashCode(): Int = {
-    var hashCode = inputFormatClazz.hashCode
-    hashCode = hashCode * 31 + path.hashCode
-    hashCode
-  }
-
-  // Since we are not doing canonicalization of path, this can be wrong : like relative vs absolute path
-  // .. which is fine, this is best case effort to remove duplicates - right ?
-  override def equals(other: Any): Boolean = other match {
-    case that: InputFormatInfo => {
-      // not checking config - that should be fine, right ?
-      this.inputFormatClazz == that.inputFormatClazz &&
-        this.path == that.path
-    }
-    case _ => false
-  }
-
-  private def validate() {
-    logDebug("validate InputFormatInfo : " + inputFormatClazz + ", path  " + path)
-
-    try {
-      if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]].isAssignableFrom(inputFormatClazz)) {
-        logDebug("inputformat is from mapreduce package")
-        mapreduceInputFormat = true
-      }
-      else if (classOf[org.apache.hadoop.mapred.InputFormat[_, _]].isAssignableFrom(inputFormatClazz)) {
-        logDebug("inputformat is from mapred package")
-        mapredInputFormat = true
-      }
-      else {
-        throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz +
-          " is NOT a supported input format ? does not implement either of the supported hadoop api's")
-      }
-    }
-    catch {
-      case e: ClassNotFoundException => {
-        throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz + " cannot be found ?", e)
-      }
-    }
-  }
-
-
-  // This method does not expect failures, since validate has already passed ...
-  private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = {
-    val env = SparkEnv.get
-    val conf = new JobConf(configuration)
-    env.hadoop.addCredentials(conf)
-    FileInputFormat.setInputPaths(conf, path)
-
-    val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
-      ReflectionUtils.newInstance(inputFormatClazz.asInstanceOf[Class[_]], conf).asInstanceOf[
-        org.apache.hadoop.mapreduce.InputFormat[_, _]]
-    val job = new Job(conf)
-
-    val retval = new ArrayBuffer[SplitInfo]()
-    val list = instance.getSplits(job)
-    for (split <- list) {
-      retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, split)
-    }
-
-    return retval.toSet
-  }
-
-  // This method does not expect failures, since validate has already passed ...
-  private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = {
-    val env = SparkEnv.get
-    val jobConf = new JobConf(configuration)
-    env.hadoop.addCredentials(jobConf)
-    FileInputFormat.setInputPaths(jobConf, path)
-
-    val instance: org.apache.hadoop.mapred.InputFormat[_, _] =
-      ReflectionUtils.newInstance(inputFormatClazz.asInstanceOf[Class[_]], jobConf).asInstanceOf[
-        org.apache.hadoop.mapred.InputFormat[_, _]]
-
-    val retval = new ArrayBuffer[SplitInfo]()
-    instance.getSplits(jobConf, jobConf.getNumMapTasks()).foreach(
-        elem => retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, elem)
-    )
-
-    return retval.toSet
-   }
-
-  private def findPreferredLocations(): Set[SplitInfo] = {
-    logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat + 
-      ", inputFormatClazz : " + inputFormatClazz)
-    if (mapreduceInputFormat) {
-      return prefLocsFromMapreduceInputFormat()
-    }
-    else {
-      assert(mapredInputFormat)
-      return prefLocsFromMapredInputFormat()
-    }
-  }
-}
-
-
-
-
-object InputFormatInfo {
-  /**
-    Computes the preferred locations based on input(s) and returned a location to block map.
-    Typical use of this method for allocation would follow some algo like this 
-    (which is what we currently do in YARN branch) :
-    a) For each host, count number of splits hosted on that host.
-    b) Decrement the currently allocated containers on that host.
-    c) Compute rack info for each host and update rack -> count map based on (b).
-    d) Allocate nodes based on (c)
-    e) On the allocation result, ensure that we dont allocate "too many" jobs on a single node 
-       (even if data locality on that is very high) : this is to prevent fragility of job if a single 
-       (or small set of) hosts go down.
-
-    go to (a) until required nodes are allocated.
-
-    If a node 'dies', follow same procedure.
-
-    PS: I know the wording here is weird, hopefully it makes some sense !
-  */
-  def computePreferredLocations(formats: Seq[InputFormatInfo]): HashMap[String, HashSet[SplitInfo]] = {
-
-    val nodeToSplit = new HashMap[String, HashSet[SplitInfo]]
-    for (inputSplit <- formats) {
-      val splits = inputSplit.findPreferredLocations()
-
-      for (split <- splits){
-        val location = split.hostLocation
-        val set = nodeToSplit.getOrElseUpdate(location, new HashSet[SplitInfo])
-        set += split
-      }
-    }
-
-    nodeToSplit
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/JobListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/JobListener.scala b/core/src/main/scala/spark/scheduler/JobListener.scala
deleted file mode 100644
index af108b8..0000000
--- a/core/src/main/scala/spark/scheduler/JobListener.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-/**
- * Interface used to listen for job completion or failure events after submitting a job to the
- * DAGScheduler. The listener is notified each time a task succeeds, as well as if the whole
- * job fails (and no further taskSucceeded events will happen).
- */
-private[spark] trait JobListener {
-  def taskSucceeded(index: Int, result: Any)
-  def jobFailed(exception: Exception)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
deleted file mode 100644
index 1bc9fab..0000000
--- a/core/src/main/scala/spark/scheduler/JobLogger.scala
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import java.io.PrintWriter
-import java.io.File
-import java.io.FileNotFoundException
-import java.text.SimpleDateFormat
-import java.util.{Date, Properties}
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.mutable.{Map, HashMap, ListBuffer}
-import scala.io.Source
-
-import spark._
-import spark.executor.TaskMetrics
-import spark.scheduler.cluster.TaskInfo
-
-// Used to record runtime information for each job, including RDD graph 
-// tasks' start/stop shuffle information and information from outside
-
-class JobLogger(val logDirName: String) extends SparkListener with Logging {
-  private val logDir =  
-    if (System.getenv("SPARK_LOG_DIR") != null)  
-      System.getenv("SPARK_LOG_DIR")
-    else 
-      "/tmp/spark"
-  private val jobIDToPrintWriter = new HashMap[Int, PrintWriter] 
-  private val stageIDToJobID = new HashMap[Int, Int]
-  private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]]
-  private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
-  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents]
-  
-  createLogDir()
-  def this() = this(String.valueOf(System.currentTimeMillis()))
-  
-  def getLogDir = logDir
-  def getJobIDtoPrintWriter = jobIDToPrintWriter
-  def getStageIDToJobID = stageIDToJobID
-  def getJobIDToStages = jobIDToStages
-  def getEventQueue = eventQueue
-  
-  // Create a folder for log files, the folder's name is the creation time of the jobLogger
-  protected def createLogDir() {
-    val dir = new File(logDir + "/" + logDirName + "/")
-    if (dir.exists()) {
-      return
-    }
-    if (dir.mkdirs() == false) {
-      logError("create log directory error:" + logDir + "/" + logDirName + "/")
-    }
-  }
-  
-  // Create a log file for one job, the file name is the jobID
-  protected def createLogWriter(jobID: Int) {
-    try{
-      val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
-      jobIDToPrintWriter += (jobID -> fileWriter)
-      } catch {
-        case e: FileNotFoundException => e.printStackTrace()
-      }
-  }
-  
-  // Close log file, and clean the stage relationship in stageIDToJobID 
-  protected def closeLogWriter(jobID: Int) = 
-    jobIDToPrintWriter.get(jobID).foreach { fileWriter => 
-      fileWriter.close()
-      jobIDToStages.get(jobID).foreach(_.foreach{ stage => 
-        stageIDToJobID -= stage.id
-      })
-      jobIDToPrintWriter -= jobID
-      jobIDToStages -= jobID
-    }
-  
-  // Write log information to log file, withTime parameter controls whether to recored 
-  // time stamp for the information
-  protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
-    var writeInfo = info
-    if (withTime) {
-      val date = new Date(System.currentTimeMillis())
-      writeInfo = DATE_FORMAT.format(date) + ": " +info
-    }
-    jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
-  }
-  
-  protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) = 
-    stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
-
-  protected def buildJobDep(jobID: Int, stage: Stage) {
-    if (stage.jobId == jobID) {
-      jobIDToStages.get(jobID) match {
-        case Some(stageList) => stageList += stage
-        case None => val stageList = new  ListBuffer[Stage]
-                     stageList += stage
-                     jobIDToStages += (jobID -> stageList)
-      }
-      stageIDToJobID += (stage.id -> jobID)
-      stage.parents.foreach(buildJobDep(jobID, _))
-    }
-  }
-
-  protected def recordStageDep(jobID: Int) {
-    def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
-      var rddList = new ListBuffer[RDD[_]]
-      rddList += rdd
-      rdd.dependencies.foreach{ dep => dep match {
-          case shufDep: ShuffleDependency[_,_] =>
-          case _ => rddList ++= getRddsInStage(dep.rdd)
-        }
-      }
-      rddList
-    }
-    jobIDToStages.get(jobID).foreach {_.foreach { stage => 
-        var depRddDesc: String = ""
-        getRddsInStage(stage.rdd).foreach { rdd => 
-          depRddDesc += rdd.id + ","
-        }
-        var depStageDesc: String = ""
-        stage.parents.foreach { stage => 
-          depStageDesc += "(" + stage.id + "," + stage.shuffleDep.get.shuffleId + ")"
-        }
-        jobLogInfo(jobID, "STAGE_ID=" + stage.id + " RDD_DEP=(" + 
-                   depRddDesc.substring(0, depRddDesc.length - 1) + ")" + 
-                   " STAGE_DEP=" + depStageDesc, false)
-      }
-    }
-  }
-  
-  // Generate indents and convert to String
-  protected def indentString(indent: Int) = {
-    val sb = new StringBuilder()
-    for (i <- 1 to indent) {
-      sb.append(" ")
-    }
-    sb.toString()
-  }
-  
-  protected def getRddName(rdd: RDD[_]) = {
-    var rddName = rdd.getClass.getName
-    if (rdd.name != null) {
-      rddName = rdd.name 
-    }
-    rddName
-  }
-  
-  protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
-    val rddInfo = "RDD_ID=" + rdd.id + "(" + getRddName(rdd) + "," + rdd.generator + ")"
-    jobLogInfo(jobID, indentString(indent) + rddInfo, false)
-    rdd.dependencies.foreach{ dep => dep match {
-        case shufDep: ShuffleDependency[_,_] => 
-          val depInfo = "SHUFFLE_ID=" + shufDep.shuffleId
-          jobLogInfo(jobID, indentString(indent + 1) + depInfo, false)
-        case _ => recordRddInStageGraph(jobID, dep.rdd, indent + 1)
-      }
-    }
-  }
-  
-  protected def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) {
-    var stageInfo: String = ""
-    if (stage.isShuffleMap) {
-      stageInfo = "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + 
-                  stage.shuffleDep.get.shuffleId
-    }else{
-      stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE"
-    }
-    if (stage.jobId == jobID) {
-      jobLogInfo(jobID, indentString(indent) + stageInfo, false)
-      recordRddInStageGraph(jobID, stage.rdd, indent)
-      stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2))
-    } else
-      jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false)
-  }
-  
-  // Record task metrics into job log files
-  protected def recordTaskMetrics(stageID: Int, status: String, 
-                                taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
-    val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + 
-               " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime + 
-               " EXECUTOR_ID=" + taskInfo.executorId +  " HOST=" + taskMetrics.hostname
-    val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
-    val readMetrics = 
-      taskMetrics.shuffleReadMetrics match {
-        case Some(metrics) => 
-          " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime + 
-          " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + 
-          " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + 
-          " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + 
-          " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + 
-          " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + 
-          " REMOTE_BYTES_READ=" + metrics.remoteBytesRead
-        case None => ""
-      }
-    val writeMetrics = 
-      taskMetrics.shuffleWriteMetrics match {
-        case Some(metrics) => 
-          " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
-        case None => ""
-      }
-    stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
-  }
-  
-  override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
-    stageLogInfo(
-      stageSubmitted.stage.id,
-      "STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
-        stageSubmitted.stage.id, stageSubmitted.taskSize))
-  }
-  
-  override def onStageCompleted(stageCompleted: StageCompleted) {
-    stageLogInfo(
-      stageCompleted.stageInfo.stage.id,
-      "STAGE_ID=%d STATUS=COMPLETED".format(stageCompleted.stageInfo.stage.id))
-    
-  }
-
-  override def onTaskStart(taskStart: SparkListenerTaskStart) { }
-
-  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
-    val task = taskEnd.task
-    val taskInfo = taskEnd.taskInfo
-    var taskStatus = ""
-    task match {
-      case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
-      case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
-    }
-    taskEnd.reason match {
-      case Success => taskStatus += " STATUS=SUCCESS"
-        recordTaskMetrics(task.stageId, taskStatus, taskInfo, taskEnd.taskMetrics)
-      case Resubmitted => 
-        taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId + 
-                      " STAGE_ID=" + task.stageId
-        stageLogInfo(task.stageId, taskStatus)
-      case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => 
-        taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" + 
-                      task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" + 
-                      mapId + " REDUCE_ID=" + reduceId
-        stageLogInfo(task.stageId, taskStatus)
-      case OtherFailure(message) => 
-        taskStatus += " STATUS=FAILURE TID=" + taskInfo.taskId + 
-                      " STAGE_ID=" + task.stageId + " INFO=" + message
-        stageLogInfo(task.stageId, taskStatus)
-      case _ =>
-    }
-  }
-  
-  override def onJobEnd(jobEnd: SparkListenerJobEnd) {
-    val job = jobEnd.job
-    var info = "JOB_ID=" + job.jobId
-    jobEnd.jobResult match {
-      case JobSucceeded => info += " STATUS=SUCCESS"
-      case JobFailed(exception, _) =>
-        info += " STATUS=FAILED REASON="
-        exception.getMessage.split("\\s+").foreach(info += _ + "_")
-      case _ =>
-    }
-    jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase)
-    closeLogWriter(job.jobId)
-  }
-
-  protected def recordJobProperties(jobID: Int, properties: Properties) {
-    if(properties != null) {
-      val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
-      jobLogInfo(jobID, description, false)
-    }
-  }
-
-  override def onJobStart(jobStart: SparkListenerJobStart) {
-    val job = jobStart.job
-    val properties = jobStart.properties
-    createLogWriter(job.jobId)
-    recordJobProperties(job.jobId, properties)
-    buildJobDep(job.jobId, job.finalStage)
-    recordStageDep(job.jobId)
-    recordStageDepGraph(job.jobId, job.finalStage)
-    jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/JobResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/JobResult.scala b/core/src/main/scala/spark/scheduler/JobResult.scala
deleted file mode 100644
index a61b335..0000000
--- a/core/src/main/scala/spark/scheduler/JobResult.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-/**
- * A result of a job in the DAGScheduler.
- */
-private[spark] sealed trait JobResult
-
-private[spark] case object JobSucceeded extends JobResult
-private[spark] case class JobFailed(exception: Exception, failedStage: Option[Stage]) extends JobResult

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/JobWaiter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/JobWaiter.scala b/core/src/main/scala/spark/scheduler/JobWaiter.scala
deleted file mode 100644
index 69cd161..0000000
--- a/core/src/main/scala/spark/scheduler/JobWaiter.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their
- * results to the given handler function.
- */
-private[spark] class JobWaiter[T](totalTasks: Int, resultHandler: (Int, T) => Unit)
-  extends JobListener {
-
-  private var finishedTasks = 0
-
-  private var jobFinished = false          // Is the job as a whole finished (succeeded or failed)?
-  private var jobResult: JobResult = null  // If the job is finished, this will be its result
-
-  override def taskSucceeded(index: Int, result: Any) {
-    synchronized {
-      if (jobFinished) {
-        throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
-      }
-      resultHandler(index, result.asInstanceOf[T])
-      finishedTasks += 1
-      if (finishedTasks == totalTasks) {
-        jobFinished = true
-        jobResult = JobSucceeded
-        this.notifyAll()
-      }
-    }
-  }
-
-  override def jobFailed(exception: Exception) {
-    synchronized {
-      if (jobFinished) {
-        throw new UnsupportedOperationException("jobFailed() called on a finished JobWaiter")
-      }
-      jobFinished = true
-      jobResult = JobFailed(exception, None)
-      this.notifyAll()
-    }
-  }
-
-  def awaitResult(): JobResult = synchronized {
-    while (!jobFinished) {
-      this.wait()
-    }
-    return jobResult
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/MapStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/MapStatus.scala b/core/src/main/scala/spark/scheduler/MapStatus.scala
deleted file mode 100644
index 2f6a68e..0000000
--- a/core/src/main/scala/spark/scheduler/MapStatus.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import spark.storage.BlockManagerId
-import java.io.{ObjectOutput, ObjectInput, Externalizable}
-
-/**
- * Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
- * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
- * The map output sizes are compressed using MapOutputTracker.compressSize.
- */
-private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
-  extends Externalizable {
-
-  def this() = this(null, null)  // For deserialization only
-
-  def writeExternal(out: ObjectOutput) {
-    location.writeExternal(out)
-    out.writeInt(compressedSizes.length)
-    out.write(compressedSizes)
-  }
-
-  def readExternal(in: ObjectInput) {
-    location = BlockManagerId(in)
-    compressedSizes = new Array[Byte](in.readInt())
-    in.readFully(compressedSizes)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
deleted file mode 100644
index d066df5..0000000
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import spark._
-import java.io._
-import util.{MetadataCleaner, TimeStampedHashMap}
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-private[spark] object ResultTask {
-
-  // A simple map between the stage id to the serialized byte array of a task.
-  // Served as a cache for task serialization because serialization can be
-  // expensive on the master node if it needs to launch thousands of tasks.
-  val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
-
-  val metadataCleaner = new MetadataCleaner("ResultTask", serializedInfoCache.clearOldValues)
-
-  def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = {
-    synchronized {
-      val old = serializedInfoCache.get(stageId).orNull
-      if (old != null) {
-        return old
-      } else {
-        val out = new ByteArrayOutputStream
-        val ser = SparkEnv.get.closureSerializer.newInstance
-        val objOut = ser.serializeStream(new GZIPOutputStream(out))
-        objOut.writeObject(rdd)
-        objOut.writeObject(func)
-        objOut.close()
-        val bytes = out.toByteArray
-        serializedInfoCache.put(stageId, bytes)
-        return bytes
-      }
-    }
-  }
-
-  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) => _) = {
-    val loader = Thread.currentThread.getContextClassLoader
-    val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
-    val ser = SparkEnv.get.closureSerializer.newInstance
-    val objIn = ser.deserializeStream(in)
-    val rdd = objIn.readObject().asInstanceOf[RDD[_]]
-    val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) => _]
-    return (rdd, func)
-  }
-
-  def clearCache() {
-    synchronized {
-      serializedInfoCache.clear()
-    }
-  }
-}
-
-
-private[spark] class ResultTask[T, U](
-    stageId: Int,
-    var rdd: RDD[T],
-    var func: (TaskContext, Iterator[T]) => U,
-    var partition: Int,
-    @transient locs: Seq[TaskLocation],
-    val outputId: Int)
-  extends Task[U](stageId) with Externalizable {
-
-  def this() = this(0, null, null, 0, null, 0)
-
-  var split = if (rdd == null) {
-    null
-  } else {
-    rdd.partitions(partition)
-  }
-
-  @transient private val preferredLocs: Seq[TaskLocation] = {
-    if (locs == null) Nil else locs.toSet.toSeq
-  }
-
-  override def run(attemptId: Long): U = {
-    val context = new TaskContext(stageId, partition, attemptId)
-    metrics = Some(context.taskMetrics)
-    try {
-      func(context, rdd.iterator(split, context))
-    } finally {
-      context.executeOnCompleteCallbacks()
-    }
-  }
-
-  override def preferredLocations: Seq[TaskLocation] = preferredLocs
-
-  override def toString = "ResultTask(" + stageId + ", " + partition + ")"
-
-  override def writeExternal(out: ObjectOutput) {
-    RDDCheckpointData.synchronized {
-      split = rdd.partitions(partition)
-      out.writeInt(stageId)
-      val bytes = ResultTask.serializeInfo(
-        stageId, rdd, func.asInstanceOf[(TaskContext, Iterator[_]) => _])
-      out.writeInt(bytes.length)
-      out.write(bytes)
-      out.writeInt(partition)
-      out.writeInt(outputId)
-      out.writeLong(epoch)
-      out.writeObject(split)
-    }
-  }
-
-  override def readExternal(in: ObjectInput) {
-    val stageId = in.readInt()
-    val numBytes = in.readInt()
-    val bytes = new Array[Byte](numBytes)
-    in.readFully(bytes)
-    val (rdd_, func_) = ResultTask.deserializeInfo(stageId, bytes)
-    rdd = rdd_.asInstanceOf[RDD[T]]
-    func = func_.asInstanceOf[(TaskContext, Iterator[T]) => U]
-    partition = in.readInt()
-    val outputId = in.readInt()
-    epoch = in.readLong()
-    split = in.readObject().asInstanceOf[Partition]
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
deleted file mode 100644
index f2a0385..0000000
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-import scala.collection.mutable.HashMap
-
-import spark._
-import spark.executor.ShuffleWriteMetrics
-import spark.storage._
-import spark.util.{TimeStampedHashMap, MetadataCleaner}
-
-
-private[spark] object ShuffleMapTask {
-
-  // A simple map between the stage id to the serialized byte array of a task.
-  // Served as a cache for task serialization because serialization can be
-  // expensive on the master node if it needs to launch thousands of tasks.
-  val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
-
-  val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.clearOldValues)
-
-  def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = {
-    synchronized {
-      val old = serializedInfoCache.get(stageId).orNull
-      if (old != null) {
-        return old
-      } else {
-        val out = new ByteArrayOutputStream
-        val ser = SparkEnv.get.closureSerializer.newInstance()
-        val objOut = ser.serializeStream(new GZIPOutputStream(out))
-        objOut.writeObject(rdd)
-        objOut.writeObject(dep)
-        objOut.close()
-        val bytes = out.toByteArray
-        serializedInfoCache.put(stageId, bytes)
-        return bytes
-      }
-    }
-  }
-
-  def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_]) = {
-    synchronized {
-      val loader = Thread.currentThread.getContextClassLoader
-      val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
-      val ser = SparkEnv.get.closureSerializer.newInstance()
-      val objIn = ser.deserializeStream(in)
-      val rdd = objIn.readObject().asInstanceOf[RDD[_]]
-      val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]]
-      return (rdd, dep)
-    }
-  }
-
-  // Since both the JarSet and FileSet have the same format this is used for both.
-  def deserializeFileSet(bytes: Array[Byte]) : HashMap[String, Long] = {
-    val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
-    val objIn = new ObjectInputStream(in)
-    val set = objIn.readObject().asInstanceOf[Array[(String, Long)]].toMap
-    return (HashMap(set.toSeq: _*))
-  }
-
-  def clearCache() {
-    synchronized {
-      serializedInfoCache.clear()
-    }
-  }
-}
-
-private[spark] class ShuffleMapTask(
-    stageId: Int,
-    var rdd: RDD[_],
-    var dep: ShuffleDependency[_,_],
-    var partition: Int,
-    @transient private var locs: Seq[TaskLocation])
-  extends Task[MapStatus](stageId)
-  with Externalizable
-  with Logging {
-
-  protected def this() = this(0, null, null, 0, null)
-
-  @transient private val preferredLocs: Seq[TaskLocation] = {
-    if (locs == null) Nil else locs.toSet.toSeq
-  }
-
-  var split = if (rdd == null) null else rdd.partitions(partition)
-
-  override def writeExternal(out: ObjectOutput) {
-    RDDCheckpointData.synchronized {
-      split = rdd.partitions(partition)
-      out.writeInt(stageId)
-      val bytes = ShuffleMapTask.serializeInfo(stageId, rdd, dep)
-      out.writeInt(bytes.length)
-      out.write(bytes)
-      out.writeInt(partition)
-      out.writeLong(epoch)
-      out.writeObject(split)
-    }
-  }
-
-  override def readExternal(in: ObjectInput) {
-    val stageId = in.readInt()
-    val numBytes = in.readInt()
-    val bytes = new Array[Byte](numBytes)
-    in.readFully(bytes)
-    val (rdd_, dep_) = ShuffleMapTask.deserializeInfo(stageId, bytes)
-    rdd = rdd_
-    dep = dep_
-    partition = in.readInt()
-    epoch = in.readLong()
-    split = in.readObject().asInstanceOf[Partition]
-  }
-
-  override def run(attemptId: Long): MapStatus = {
-    val numOutputSplits = dep.partitioner.numPartitions
-
-    val taskContext = new TaskContext(stageId, partition, attemptId)
-    metrics = Some(taskContext.taskMetrics)
-
-    val blockManager = SparkEnv.get.blockManager
-    var shuffle: ShuffleBlocks = null
-    var buckets: ShuffleWriterGroup = null
-
-    try {
-      // Obtain all the block writers for shuffle blocks.
-      val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
-      shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)
-      buckets = shuffle.acquireWriters(partition)
-
-      // Write the map output to its associated buckets.
-      for (elem <- rdd.iterator(split, taskContext)) {
-        val pair = elem.asInstanceOf[Product2[Any, Any]]
-        val bucketId = dep.partitioner.getPartition(pair._1)
-        buckets.writers(bucketId).write(pair)
-      }
-
-      // Commit the writes. Get the size of each bucket block (total block size).
-      var totalBytes = 0L
-      val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter =>
-        writer.commit()
-        writer.close()
-        val size = writer.size()
-        totalBytes += size
-        MapOutputTracker.compressSize(size)
-      }
-
-      // Update shuffle metrics.
-      val shuffleMetrics = new ShuffleWriteMetrics
-      shuffleMetrics.shuffleBytesWritten = totalBytes
-      metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
-
-      return new MapStatus(blockManager.blockManagerId, compressedSizes)
-    } catch { case e: Exception =>
-      // If there is an exception from running the task, revert the partial writes
-      // and throw the exception upstream to Spark.
-      if (buckets != null) {
-        buckets.writers.foreach(_.revertPartialWrites())
-      }
-      throw e
-    } finally {
-      // Release the writers back to the shuffle block manager.
-      if (shuffle != null && buckets != null) {
-        shuffle.releaseWriters(buckets)
-      }
-      // Execute the callbacks on task completion.
-      taskContext.executeOnCompleteCallbacks()
-    }
-  }
-
-  override def preferredLocations: Seq[TaskLocation] = preferredLocs
-
-  override def toString = "ShuffleMapTask(%d, %d)".format(stageId, partition)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
deleted file mode 100644
index e553101..0000000
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import java.util.Properties
-import spark.scheduler.cluster.TaskInfo
-import spark.util.Distribution
-import spark.{Logging, SparkContext, TaskEndReason, Utils}
-import spark.executor.TaskMetrics
-
-sealed trait SparkListenerEvents
-
-case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties)
-     extends SparkListenerEvents
-
-case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
-
-case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
-
-case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
-     taskMetrics: TaskMetrics) extends SparkListenerEvents
-
-case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null)
-     extends SparkListenerEvents
-
-case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
-     extends SparkListenerEvents
-
-trait SparkListener {
-  /**
-   * Called when a stage is completed, with information on the completed stage
-   */
-  def onStageCompleted(stageCompleted: StageCompleted) { }
-
-  /**
-   * Called when a stage is submitted
-   */
-  def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
-
-  /**
-   * Called when a task starts
-   */
-  def onTaskStart(taskEnd: SparkListenerTaskStart) { }
-
-  /**
-   * Called when a task ends
-   */
-  def onTaskEnd(taskEnd: SparkListenerTaskEnd) { }
-
-  /**
-   * Called when a job starts
-   */
-  def onJobStart(jobStart: SparkListenerJobStart) { }
-
-  /**
-   * Called when a job ends
-   */
-  def onJobEnd(jobEnd: SparkListenerJobEnd) { }
-
-}
-
-/**
- * Simple SparkListener that logs a few summary statistics when each stage completes
- */
-class StatsReportListener extends SparkListener with Logging {
-  override def onStageCompleted(stageCompleted: StageCompleted) {
-    import spark.scheduler.StatsReportListener._
-    implicit val sc = stageCompleted
-    this.logInfo("Finished stage: " + stageCompleted.stageInfo)
-    showMillisDistribution("task runtime:", (info, _) => Some(info.duration))
-
-    //shuffle write
-    showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten})
-
-    //fetch & io
-    showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime})
-    showBytesDistribution("remote bytes read:", (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead})
-    showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize))
-
-    //runtime breakdown
-
-    val runtimePcts = stageCompleted.stageInfo.taskInfos.map{
-      case (info, metrics) => RuntimePercentage(info.duration, metrics)
-    }
-    showDistribution("executor (non-fetch) time pct: ", Distribution(runtimePcts.map{_.executorPct * 100}), "%2.0f %%")
-    showDistribution("fetch wait time pct: ", Distribution(runtimePcts.flatMap{_.fetchPct.map{_ * 100}}), "%2.0f %%")
-    showDistribution("other time pct: ", Distribution(runtimePcts.map{_.other * 100}), "%2.0f %%")
-  }
-
-}
-
-object StatsReportListener extends Logging {
-
-  //for profiling, the extremes are more interesting
-  val percentiles = Array[Int](0,5,10,25,50,75,90,95,100)
-  val probabilities = percentiles.map{_ / 100.0}
-  val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
-
-  def extractDoubleDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Double]): Option[Distribution] = {
-    Distribution(stage.stageInfo.taskInfos.flatMap{
-      case ((info,metric)) => getMetric(info, metric)})
-  }
-
-  //is there some way to setup the types that I can get rid of this completely?
-  def extractLongDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Long]): Option[Distribution] = {
-    extractDoubleDistribution(stage, (info, metric) => getMetric(info,metric).map{_.toDouble})
-  }
-
-  def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
-    val stats = d.statCounter
-    logInfo(heading + stats)
-    val quantiles = d.getQuantiles(probabilities).map{formatNumber}
-    logInfo(percentilesHeader)
-    logInfo("\t" + quantiles.mkString("\t"))
-  }
-
-  def showDistribution(heading: String, dOpt: Option[Distribution], formatNumber: Double => String) {
-    dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
-  }
-
-  def showDistribution(heading: String, dOpt: Option[Distribution], format:String) {
-    def f(d:Double) = format.format(d)
-    showDistribution(heading, dOpt, f _)
-  }
-
-  def showDistribution(heading:String, format: String, getMetric: (TaskInfo,TaskMetrics) => Option[Double])
-    (implicit stage: StageCompleted) {
-    showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
-  }
-
-  def showBytesDistribution(heading:String, getMetric: (TaskInfo,TaskMetrics) => Option[Long])
-    (implicit stage: StageCompleted) {
-    showBytesDistribution(heading, extractLongDistribution(stage, getMetric))
-  }
-
-  def showBytesDistribution(heading: String, dOpt: Option[Distribution]) {
-    dOpt.foreach{dist => showBytesDistribution(heading, dist)}
-  }
-
-  def showBytesDistribution(heading: String, dist: Distribution) {
-    showDistribution(heading, dist, (d => Utils.bytesToString(d.toLong)): Double => String)
-  }
-
-  def showMillisDistribution(heading: String, dOpt: Option[Distribution]) {
-    showDistribution(heading, dOpt, (d => StatsReportListener.millisToString(d.toLong)): Double => String)
-  }
-
-  def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long])
-    (implicit stage: StageCompleted) {
-    showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
-  }
-
-
-
-  val seconds = 1000L
-  val minutes = seconds * 60
-  val hours = minutes * 60
-
-  /**
-   * reformat a time interval in milliseconds to a prettier format for output
-   */
-  def millisToString(ms: Long) = {
-    val (size, units) =
-      if (ms > hours) {
-        (ms.toDouble / hours, "hours")
-      } else if (ms > minutes) {
-        (ms.toDouble / minutes, "min")
-      } else if (ms > seconds) {
-        (ms.toDouble / seconds, "s")
-      } else {
-        (ms.toDouble, "ms")
-      }
-    "%.1f %s".format(size, units)
-  }
-}
-
-
-
-case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
-object RuntimePercentage {
-  def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
-    val denom = totalTime.toDouble
-    val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime}
-    val fetch = fetchTime.map{_ / denom}
-    val exec = (metrics.executorRunTime - fetchTime.getOrElse(0l)) / denom
-    val other = 1.0 - (exec + fetch.getOrElse(0d))
-    RuntimePercentage(exec, fetch, other)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/spark/scheduler/SparkListenerBus.scala
deleted file mode 100644
index f55ed45..0000000
--- a/core/src/main/scala/spark/scheduler/SparkListenerBus.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
-
-import spark.Logging
-
-/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
-private[spark] class SparkListenerBus() extends Logging {
-  private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener]
-
-  /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
-   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
-  private val EVENT_QUEUE_CAPACITY = 10000 
-  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
-  private var queueFullErrorMessageLogged = false
-
-  new Thread("SparkListenerBus") {
-    setDaemon(true)
-    override def run() {
-      while (true) {
-        val event = eventQueue.take
-        event match {
-          case stageSubmitted: SparkListenerStageSubmitted =>
-            sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
-          case stageCompleted: StageCompleted =>
-            sparkListeners.foreach(_.onStageCompleted(stageCompleted))
-          case jobStart: SparkListenerJobStart =>
-            sparkListeners.foreach(_.onJobStart(jobStart))
-          case jobEnd: SparkListenerJobEnd =>
-            sparkListeners.foreach(_.onJobEnd(jobEnd))
-          case taskStart: SparkListenerTaskStart =>
-            sparkListeners.foreach(_.onTaskStart(taskStart))
-          case taskEnd: SparkListenerTaskEnd =>
-            sparkListeners.foreach(_.onTaskEnd(taskEnd))
-          case _ =>
-        }
-      }
-    }
-  }.start()
-
-  def addListener(listener: SparkListener) {
-    sparkListeners += listener
-  }
-
-  def post(event: SparkListenerEvents) {
-    val eventAdded = eventQueue.offer(event)
-    if (!eventAdded && !queueFullErrorMessageLogged) {
-      logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
-        "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
-        "rate at which tasks are being started by the scheduler.")
-      queueFullErrorMessageLogged = true
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/SplitInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/SplitInfo.scala b/core/src/main/scala/spark/scheduler/SplitInfo.scala
deleted file mode 100644
index 4e3661e..0000000
--- a/core/src/main/scala/spark/scheduler/SplitInfo.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import collection.mutable.ArrayBuffer
-
-// information about a specific split instance : handles both split instances.
-// So that we do not need to worry about the differences.
-class SplitInfo(val inputFormatClazz: Class[_], val hostLocation: String, val path: String,
-                val length: Long, val underlyingSplit: Any) {
-  override def toString(): String = {
-    "SplitInfo " + super.toString + " .. inputFormatClazz " + inputFormatClazz +
-      ", hostLocation : " + hostLocation + ", path : " + path +
-      ", length : " + length + ", underlyingSplit " + underlyingSplit
-  }
-
-  override def hashCode(): Int = {
-    var hashCode = inputFormatClazz.hashCode
-    hashCode = hashCode * 31 + hostLocation.hashCode
-    hashCode = hashCode * 31 + path.hashCode
-    // ignore overflow ? It is hashcode anyway !
-    hashCode = hashCode * 31 + (length & 0x7fffffff).toInt
-    hashCode
-  }
-
-  // This is practically useless since most of the Split impl's dont seem to implement equals :-(
-  // So unless there is identity equality between underlyingSplits, it will always fail even if it
-  // is pointing to same block.
-  override def equals(other: Any): Boolean = other match {
-    case that: SplitInfo => {
-      this.hostLocation == that.hostLocation &&
-        this.inputFormatClazz == that.inputFormatClazz &&
-        this.path == that.path &&
-        this.length == that.length &&
-        // other split specific checks (like start for FileSplit)
-        this.underlyingSplit == that.underlyingSplit
-    }
-    case _ => false
-  }
-}
-
-object SplitInfo {
-
-  def toSplitInfo(inputFormatClazz: Class[_], path: String,
-                  mapredSplit: org.apache.hadoop.mapred.InputSplit): Seq[SplitInfo] = {
-    val retval = new ArrayBuffer[SplitInfo]()
-    val length = mapredSplit.getLength
-    for (host <- mapredSplit.getLocations) {
-      retval += new SplitInfo(inputFormatClazz, host, path, length, mapredSplit)
-    }
-    retval
-  }
-
-  def toSplitInfo(inputFormatClazz: Class[_], path: String,
-                  mapreduceSplit: org.apache.hadoop.mapreduce.InputSplit): Seq[SplitInfo] = {
-    val retval = new ArrayBuffer[SplitInfo]()
-    val length = mapreduceSplit.getLength
-    for (host <- mapreduceSplit.getLocations) {
-      retval += new SplitInfo(inputFormatClazz, host, path, length, mapreduceSplit)
-    }
-    retval
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
deleted file mode 100644
index c599c00..0000000
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import java.net.URI
-
-import spark._
-import spark.storage.BlockManagerId
-
-/**
- * A stage is a set of independent tasks all computing the same function that need to run as part
- * of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
- * by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
- * DAGScheduler runs these stages in topological order.
- *
- * Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
- * another stage, or a result stage, in which case its tasks directly compute the action that
- * initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
- * that each output partition is on.
- *
- * Each Stage also has a jobId, identifying the job that first submitted the stage.  When FIFO
- * scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
- * faster on failure.
- */
-private[spark] class Stage(
-    val id: Int,
-    val rdd: RDD[_],
-    val shuffleDep: Option[ShuffleDependency[_,_]],  // Output shuffle if stage is a map stage
-    val parents: List[Stage],
-    val jobId: Int,
-    callSite: Option[String])
-  extends Logging {
-
-  val isShuffleMap = shuffleDep != None
-  val numPartitions = rdd.partitions.size
-  val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
-  var numAvailableOutputs = 0
-
-  /** When first task was submitted to scheduler. */
-  var submissionTime: Option[Long] = None
-  var completionTime: Option[Long] = None
-
-  private var nextAttemptId = 0
-
-  def isAvailable: Boolean = {
-    if (!isShuffleMap) {
-      true
-    } else {
-      numAvailableOutputs == numPartitions
-    }
-  }
-
-  def addOutputLoc(partition: Int, status: MapStatus) {
-    val prevList = outputLocs(partition)
-    outputLocs(partition) = status :: prevList
-    if (prevList == Nil)
-      numAvailableOutputs += 1
-  }
-
-  def removeOutputLoc(partition: Int, bmAddress: BlockManagerId) {
-    val prevList = outputLocs(partition)
-    val newList = prevList.filterNot(_.location == bmAddress)
-    outputLocs(partition) = newList
-    if (prevList != Nil && newList == Nil) {
-      numAvailableOutputs -= 1
-    }
-  }
-
-  def removeOutputsOnExecutor(execId: String) {
-    var becameUnavailable = false
-    for (partition <- 0 until numPartitions) {
-      val prevList = outputLocs(partition)
-      val newList = prevList.filterNot(_.location.executorId == execId)
-      outputLocs(partition) = newList
-      if (prevList != Nil && newList == Nil) {
-        becameUnavailable = true
-        numAvailableOutputs -= 1
-      }
-    }
-    if (becameUnavailable) {
-      logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
-        this, execId, numAvailableOutputs, numPartitions, isAvailable))
-    }
-  }
-
-  def newAttemptId(): Int = {
-    val id = nextAttemptId
-    nextAttemptId += 1
-    return id
-  }
-
-  val name = callSite.getOrElse(rdd.origin)
-
-  override def toString = "Stage " + id
-
-  override def hashCode(): Int = id
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/StageInfo.scala b/core/src/main/scala/spark/scheduler/StageInfo.scala
deleted file mode 100644
index c4026f9..0000000
--- a/core/src/main/scala/spark/scheduler/StageInfo.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import spark.scheduler.cluster.TaskInfo
-import scala.collection._
-import spark.executor.TaskMetrics
-
-case class StageInfo(
-    val stage: Stage,
-    val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] = mutable.Buffer[(TaskInfo, TaskMetrics)]()
-) {
-  override def toString = stage.rdd.toString
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/Task.scala b/core/src/main/scala/spark/scheduler/Task.scala
deleted file mode 100644
index 0ab2ae6..0000000
--- a/core/src/main/scala/spark/scheduler/Task.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import spark.serializer.SerializerInstance
-import java.io.{DataInputStream, DataOutputStream}
-import java.nio.ByteBuffer
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-import spark.util.ByteBufferInputStream
-import scala.collection.mutable.HashMap
-import spark.executor.TaskMetrics
-
-/**
- * A task to execute on a worker node.
- */
-private[spark] abstract class Task[T](val stageId: Int) extends Serializable {
-  def run(attemptId: Long): T
-  def preferredLocations: Seq[TaskLocation] = Nil
-
-  var epoch: Long = -1   // Map output tracker epoch. Will be set by TaskScheduler.
-
-  var metrics: Option[TaskMetrics] = None
-
-}
-
-/**
- * Handles transmission of tasks and their dependencies, because this can be slightly tricky. We
- * need to send the list of JARs and files added to the SparkContext with each task to ensure that
- * worker nodes find out about it, but we can't make it part of the Task because the user's code in
- * the task might depend on one of the JARs. Thus we serialize each task as multiple objects, by
- * first writing out its dependencies.
- */
-private[spark] object Task {
-  /**
-   * Serialize a task and the current app dependencies (files and JARs added to the SparkContext)
-   */
-  def serializeWithDependencies(
-      task: Task[_],
-      currentFiles: HashMap[String, Long],
-      currentJars: HashMap[String, Long],
-      serializer: SerializerInstance)
-    : ByteBuffer = {
-
-    val out = new FastByteArrayOutputStream(4096)
-    val dataOut = new DataOutputStream(out)
-
-    // Write currentFiles
-    dataOut.writeInt(currentFiles.size)
-    for ((name, timestamp) <- currentFiles) {
-      dataOut.writeUTF(name)
-      dataOut.writeLong(timestamp)
-    }
-
-    // Write currentJars
-    dataOut.writeInt(currentJars.size)
-    for ((name, timestamp) <- currentJars) {
-      dataOut.writeUTF(name)
-      dataOut.writeLong(timestamp)
-    }
-
-    // Write the task itself and finish
-    dataOut.flush()
-    val taskBytes = serializer.serialize(task).array()
-    out.write(taskBytes)
-    out.trim()
-    ByteBuffer.wrap(out.array)
-  }
-
-  /**
-   * Deserialize the list of dependencies in a task serialized with serializeWithDependencies,
-   * and return the task itself as a serialized ByteBuffer. The caller can then update its
-   * ClassLoaders and deserialize the task.
-   *
-   * @return (taskFiles, taskJars, taskBytes)
-   */
-  def deserializeWithDependencies(serializedTask: ByteBuffer)
-    : (HashMap[String, Long], HashMap[String, Long], ByteBuffer) = {
-
-    val in = new ByteBufferInputStream(serializedTask)
-    val dataIn = new DataInputStream(in)
-
-    // Read task's files
-    val taskFiles = new HashMap[String, Long]()
-    val numFiles = dataIn.readInt()
-    for (i <- 0 until numFiles) {
-      taskFiles(dataIn.readUTF()) = dataIn.readLong()
-    }
-
-    // Read task's JARs
-    val taskJars = new HashMap[String, Long]()
-    val numJars = dataIn.readInt()
-    for (i <- 0 until numJars) {
-      taskJars(dataIn.readUTF()) = dataIn.readLong()
-    }
-
-    // Create a sub-buffer for the rest of the data, which is the serialized Task object
-    val subBuffer = serializedTask.slice()  // ByteBufferInputStream will have read just up to task
-    (taskFiles, taskJars, subBuffer)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/TaskLocation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/TaskLocation.scala b/core/src/main/scala/spark/scheduler/TaskLocation.scala
deleted file mode 100644
index fea117e..0000000
--- a/core/src/main/scala/spark/scheduler/TaskLocation.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-/**
- * A location where a task should run. This can either be a host or a (host, executorID) pair.
- * In the latter case, we will prefer to launch the task on that executorID, but our next level
- * of preference will be executors on the same host if this is not possible.
- */
-private[spark]
-class TaskLocation private (val host: String, val executorId: Option[String]) extends Serializable {
-  override def toString: String = "TaskLocation(" + host + ", " + executorId + ")"
-}
-
-private[spark] object TaskLocation {
-  def apply(host: String, executorId: String) = new TaskLocation(host, Some(executorId))
-
-  def apply(host: String) = new TaskLocation(host, None)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/TaskResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/TaskResult.scala b/core/src/main/scala/spark/scheduler/TaskResult.scala
deleted file mode 100644
index fc48567..0000000
--- a/core/src/main/scala/spark/scheduler/TaskResult.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import java.io._
-
-import scala.collection.mutable.Map
-import spark.executor.TaskMetrics
-import spark.{Utils, SparkEnv}
-import java.nio.ByteBuffer
-
-// Task result. Also contains updates to accumulator variables.
-// TODO: Use of distributed cache to return result is a hack to get around
-// what seems to be a bug with messages over 60KB in libprocess; fix it
-private[spark]
-class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
-  extends Externalizable
-{
-  def this() = this(null.asInstanceOf[T], null, null)
-
-  override def writeExternal(out: ObjectOutput) {
-
-    val objectSer = SparkEnv.get.serializer.newInstance()
-    val bb = objectSer.serialize(value)
-
-    out.writeInt(bb.remaining())
-    Utils.writeByteBuffer(bb, out)
-
-    out.writeInt(accumUpdates.size)
-    for ((key, value) <- accumUpdates) {
-      out.writeLong(key)
-      out.writeObject(value)
-    }
-    out.writeObject(metrics)
-  }
-
-  override def readExternal(in: ObjectInput) {
-
-    val objectSer = SparkEnv.get.serializer.newInstance()
-
-    val blen = in.readInt()
-    val byteVal = new Array[Byte](blen)
-    in.readFully(byteVal)
-    value = objectSer.deserialize(ByteBuffer.wrap(byteVal))
-
-    val numUpdates = in.readInt
-    if (numUpdates == 0) {
-      accumUpdates = null
-    } else {
-      accumUpdates = Map()
-      for (i <- 0 until numUpdates) {
-        accumUpdates(in.readLong()) = in.readObject()
-      }
-    }
-    metrics = in.readObject().asInstanceOf[TaskMetrics]
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/spark/scheduler/TaskScheduler.scala
deleted file mode 100644
index 4943d58..0000000
--- a/core/src/main/scala/spark/scheduler/TaskScheduler.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import spark.scheduler.cluster.Pool
-import spark.scheduler.cluster.SchedulingMode.SchedulingMode
-/**
- * Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
- * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
- * and are responsible for sending the tasks to the cluster, running them, retrying if there
- * are failures, and mitigating stragglers. They return events to the DAGScheduler through
- * the TaskSchedulerListener interface.
- */
-private[spark] trait TaskScheduler {
-
-  def rootPool: Pool
-
-  def schedulingMode: SchedulingMode
-
-  def start(): Unit
-
-  // Invoked after system has successfully initialized (typically in spark context).
-  // Yarn uses this to bootstrap allocation of resources based on preferred locations, wait for slave registerations, etc.
-  def postStartHook() { }
-
-  // Disconnect from the cluster.
-  def stop(): Unit
-
-  // Submit a sequence of tasks to run.
-  def submitTasks(taskSet: TaskSet): Unit
-
-  // Set a listener for upcalls. This is guaranteed to be set before submitTasks is called.
-  def setListener(listener: TaskSchedulerListener): Unit
-
-  // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
-  def defaultParallelism(): Int
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
deleted file mode 100644
index 64be50b..0000000
--- a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import spark.scheduler.cluster.TaskInfo
-import scala.collection.mutable.Map
-
-import spark.TaskEndReason
-import spark.executor.TaskMetrics
-
-/**
- * Interface for getting events back from the TaskScheduler.
- */
-private[spark] trait TaskSchedulerListener {
-  // A task has started.
-  def taskStarted(task: Task[_], taskInfo: TaskInfo)
-
-  // A task has finished or failed.
-  def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any],
-                taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit
-
-  // A node was added to the cluster.
-  def executorGained(execId: String, host: String): Unit
-
-  // A node was lost from the cluster.
-  def executorLost(execId: String): Unit
-
-  // The TaskScheduler wants to abort an entire task set.
-  def taskSetFailed(taskSet: TaskSet, reason: String): Unit
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/TaskSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/TaskSet.scala b/core/src/main/scala/spark/scheduler/TaskSet.scala
deleted file mode 100644
index dc3550d..0000000
--- a/core/src/main/scala/spark/scheduler/TaskSet.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import java.util.Properties
-
-/**
- * A set of tasks submitted together to the low-level TaskScheduler, usually representing
- * missing partitions of a particular stage.
- */
-private[spark] class TaskSet(
-    val tasks: Array[Task[_]],
-    val stageId: Int,
-    val attempt: Int,
-    val priority: Int,
-    val properties: Properties) {
-    val id: String = stageId + "." + attempt
-
-  override def toString: String = "TaskSet " + id
-}