You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:02 UTC
[18/69] [abbrv] [partial] Initial work to rename package to
org.apache.spark
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
deleted file mode 100644
index 8f1b9b2..0000000
--- a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import spark.{Logging, SparkEnv}
-import scala.collection.immutable.Set
-import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.util.ReflectionUtils
-import org.apache.hadoop.mapreduce.Job
-import org.apache.hadoop.conf.Configuration
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.collection.JavaConversions._
-
-
-/**
- * Parses and holds information about inputFormat (and files) specified as a parameter.
- */
-class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Class[_],
- val path: String) extends Logging {
-
- var mapreduceInputFormat: Boolean = false
- var mapredInputFormat: Boolean = false
-
- validate()
-
- override def toString(): String = {
- "InputFormatInfo " + super.toString + " .. inputFormatClazz " + inputFormatClazz + ", path : " + path
- }
-
- override def hashCode(): Int = {
- var hashCode = inputFormatClazz.hashCode
- hashCode = hashCode * 31 + path.hashCode
- hashCode
- }
-
- // Since we are not doing canonicalization of path, this can be wrong : like relative vs absolute path
- // .. which is fine, this is best case effort to remove duplicates - right ?
- override def equals(other: Any): Boolean = other match {
- case that: InputFormatInfo => {
- // not checking config - that should be fine, right ?
- this.inputFormatClazz == that.inputFormatClazz &&
- this.path == that.path
- }
- case _ => false
- }
-
- private def validate() {
- logDebug("validate InputFormatInfo : " + inputFormatClazz + ", path " + path)
-
- try {
- if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]].isAssignableFrom(inputFormatClazz)) {
- logDebug("inputformat is from mapreduce package")
- mapreduceInputFormat = true
- }
- else if (classOf[org.apache.hadoop.mapred.InputFormat[_, _]].isAssignableFrom(inputFormatClazz)) {
- logDebug("inputformat is from mapred package")
- mapredInputFormat = true
- }
- else {
- throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz +
- " is NOT a supported input format ? does not implement either of the supported hadoop api's")
- }
- }
- catch {
- case e: ClassNotFoundException => {
- throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz + " cannot be found ?", e)
- }
- }
- }
-
-
- // This method does not expect failures, since validate has already passed ...
- private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = {
- val env = SparkEnv.get
- val conf = new JobConf(configuration)
- env.hadoop.addCredentials(conf)
- FileInputFormat.setInputPaths(conf, path)
-
- val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
- ReflectionUtils.newInstance(inputFormatClazz.asInstanceOf[Class[_]], conf).asInstanceOf[
- org.apache.hadoop.mapreduce.InputFormat[_, _]]
- val job = new Job(conf)
-
- val retval = new ArrayBuffer[SplitInfo]()
- val list = instance.getSplits(job)
- for (split <- list) {
- retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, split)
- }
-
- return retval.toSet
- }
-
- // This method does not expect failures, since validate has already passed ...
- private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = {
- val env = SparkEnv.get
- val jobConf = new JobConf(configuration)
- env.hadoop.addCredentials(jobConf)
- FileInputFormat.setInputPaths(jobConf, path)
-
- val instance: org.apache.hadoop.mapred.InputFormat[_, _] =
- ReflectionUtils.newInstance(inputFormatClazz.asInstanceOf[Class[_]], jobConf).asInstanceOf[
- org.apache.hadoop.mapred.InputFormat[_, _]]
-
- val retval = new ArrayBuffer[SplitInfo]()
- instance.getSplits(jobConf, jobConf.getNumMapTasks()).foreach(
- elem => retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, elem)
- )
-
- return retval.toSet
- }
-
- private def findPreferredLocations(): Set[SplitInfo] = {
- logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat +
- ", inputFormatClazz : " + inputFormatClazz)
- if (mapreduceInputFormat) {
- return prefLocsFromMapreduceInputFormat()
- }
- else {
- assert(mapredInputFormat)
- return prefLocsFromMapredInputFormat()
- }
- }
-}
-
-
-
-
-object InputFormatInfo {
- /**
- Computes the preferred locations based on input(s) and returned a location to block map.
- Typical use of this method for allocation would follow some algo like this
- (which is what we currently do in YARN branch) :
- a) For each host, count number of splits hosted on that host.
- b) Decrement the currently allocated containers on that host.
- c) Compute rack info for each host and update rack -> count map based on (b).
- d) Allocate nodes based on (c)
- e) On the allocation result, ensure that we dont allocate "too many" jobs on a single node
- (even if data locality on that is very high) : this is to prevent fragility of job if a single
- (or small set of) hosts go down.
-
- go to (a) until required nodes are allocated.
-
- If a node 'dies', follow same procedure.
-
- PS: I know the wording here is weird, hopefully it makes some sense !
- */
- def computePreferredLocations(formats: Seq[InputFormatInfo]): HashMap[String, HashSet[SplitInfo]] = {
-
- val nodeToSplit = new HashMap[String, HashSet[SplitInfo]]
- for (inputSplit <- formats) {
- val splits = inputSplit.findPreferredLocations()
-
- for (split <- splits){
- val location = split.hostLocation
- val set = nodeToSplit.getOrElseUpdate(location, new HashSet[SplitInfo])
- set += split
- }
- }
-
- nodeToSplit
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/JobListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/JobListener.scala b/core/src/main/scala/spark/scheduler/JobListener.scala
deleted file mode 100644
index af108b8..0000000
--- a/core/src/main/scala/spark/scheduler/JobListener.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-/**
- * Interface used to listen for job completion or failure events after submitting a job to the
- * DAGScheduler. The listener is notified each time a task succeeds, as well as if the whole
- * job fails (and no further taskSucceeded events will happen).
- */
-private[spark] trait JobListener {
- def taskSucceeded(index: Int, result: Any)
- def jobFailed(exception: Exception)
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala
deleted file mode 100644
index 1bc9fab..0000000
--- a/core/src/main/scala/spark/scheduler/JobLogger.scala
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import java.io.PrintWriter
-import java.io.File
-import java.io.FileNotFoundException
-import java.text.SimpleDateFormat
-import java.util.{Date, Properties}
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.mutable.{Map, HashMap, ListBuffer}
-import scala.io.Source
-
-import spark._
-import spark.executor.TaskMetrics
-import spark.scheduler.cluster.TaskInfo
-
-// Used to record runtime information for each job, including RDD graph
-// tasks' start/stop shuffle information and information from outside
-
-class JobLogger(val logDirName: String) extends SparkListener with Logging {
- private val logDir =
- if (System.getenv("SPARK_LOG_DIR") != null)
- System.getenv("SPARK_LOG_DIR")
- else
- "/tmp/spark"
- private val jobIDToPrintWriter = new HashMap[Int, PrintWriter]
- private val stageIDToJobID = new HashMap[Int, Int]
- private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]]
- private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
- private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents]
-
- createLogDir()
- def this() = this(String.valueOf(System.currentTimeMillis()))
-
- def getLogDir = logDir
- def getJobIDtoPrintWriter = jobIDToPrintWriter
- def getStageIDToJobID = stageIDToJobID
- def getJobIDToStages = jobIDToStages
- def getEventQueue = eventQueue
-
- // Create a folder for log files, the folder's name is the creation time of the jobLogger
- protected def createLogDir() {
- val dir = new File(logDir + "/" + logDirName + "/")
- if (dir.exists()) {
- return
- }
- if (dir.mkdirs() == false) {
- logError("create log directory error:" + logDir + "/" + logDirName + "/")
- }
- }
-
- // Create a log file for one job, the file name is the jobID
- protected def createLogWriter(jobID: Int) {
- try{
- val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
- jobIDToPrintWriter += (jobID -> fileWriter)
- } catch {
- case e: FileNotFoundException => e.printStackTrace()
- }
- }
-
- // Close log file, and clean the stage relationship in stageIDToJobID
- protected def closeLogWriter(jobID: Int) =
- jobIDToPrintWriter.get(jobID).foreach { fileWriter =>
- fileWriter.close()
- jobIDToStages.get(jobID).foreach(_.foreach{ stage =>
- stageIDToJobID -= stage.id
- })
- jobIDToPrintWriter -= jobID
- jobIDToStages -= jobID
- }
-
- // Write log information to log file, withTime parameter controls whether to recored
- // time stamp for the information
- protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
- var writeInfo = info
- if (withTime) {
- val date = new Date(System.currentTimeMillis())
- writeInfo = DATE_FORMAT.format(date) + ": " +info
- }
- jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
- }
-
- protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) =
- stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
-
- protected def buildJobDep(jobID: Int, stage: Stage) {
- if (stage.jobId == jobID) {
- jobIDToStages.get(jobID) match {
- case Some(stageList) => stageList += stage
- case None => val stageList = new ListBuffer[Stage]
- stageList += stage
- jobIDToStages += (jobID -> stageList)
- }
- stageIDToJobID += (stage.id -> jobID)
- stage.parents.foreach(buildJobDep(jobID, _))
- }
- }
-
- protected def recordStageDep(jobID: Int) {
- def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
- var rddList = new ListBuffer[RDD[_]]
- rddList += rdd
- rdd.dependencies.foreach{ dep => dep match {
- case shufDep: ShuffleDependency[_,_] =>
- case _ => rddList ++= getRddsInStage(dep.rdd)
- }
- }
- rddList
- }
- jobIDToStages.get(jobID).foreach {_.foreach { stage =>
- var depRddDesc: String = ""
- getRddsInStage(stage.rdd).foreach { rdd =>
- depRddDesc += rdd.id + ","
- }
- var depStageDesc: String = ""
- stage.parents.foreach { stage =>
- depStageDesc += "(" + stage.id + "," + stage.shuffleDep.get.shuffleId + ")"
- }
- jobLogInfo(jobID, "STAGE_ID=" + stage.id + " RDD_DEP=(" +
- depRddDesc.substring(0, depRddDesc.length - 1) + ")" +
- " STAGE_DEP=" + depStageDesc, false)
- }
- }
- }
-
- // Generate indents and convert to String
- protected def indentString(indent: Int) = {
- val sb = new StringBuilder()
- for (i <- 1 to indent) {
- sb.append(" ")
- }
- sb.toString()
- }
-
- protected def getRddName(rdd: RDD[_]) = {
- var rddName = rdd.getClass.getName
- if (rdd.name != null) {
- rddName = rdd.name
- }
- rddName
- }
-
- protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
- val rddInfo = "RDD_ID=" + rdd.id + "(" + getRddName(rdd) + "," + rdd.generator + ")"
- jobLogInfo(jobID, indentString(indent) + rddInfo, false)
- rdd.dependencies.foreach{ dep => dep match {
- case shufDep: ShuffleDependency[_,_] =>
- val depInfo = "SHUFFLE_ID=" + shufDep.shuffleId
- jobLogInfo(jobID, indentString(indent + 1) + depInfo, false)
- case _ => recordRddInStageGraph(jobID, dep.rdd, indent + 1)
- }
- }
- }
-
- protected def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) {
- var stageInfo: String = ""
- if (stage.isShuffleMap) {
- stageInfo = "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" +
- stage.shuffleDep.get.shuffleId
- }else{
- stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE"
- }
- if (stage.jobId == jobID) {
- jobLogInfo(jobID, indentString(indent) + stageInfo, false)
- recordRddInStageGraph(jobID, stage.rdd, indent)
- stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2))
- } else
- jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false)
- }
-
- // Record task metrics into job log files
- protected def recordTaskMetrics(stageID: Int, status: String,
- taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
- val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID +
- " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime +
- " EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskMetrics.hostname
- val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
- val readMetrics =
- taskMetrics.shuffleReadMetrics match {
- case Some(metrics) =>
- " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime +
- " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
- " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
- " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
- " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
- " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime +
- " REMOTE_BYTES_READ=" + metrics.remoteBytesRead
- case None => ""
- }
- val writeMetrics =
- taskMetrics.shuffleWriteMetrics match {
- case Some(metrics) =>
- " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
- case None => ""
- }
- stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
- }
-
- override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
- stageLogInfo(
- stageSubmitted.stage.id,
- "STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
- stageSubmitted.stage.id, stageSubmitted.taskSize))
- }
-
- override def onStageCompleted(stageCompleted: StageCompleted) {
- stageLogInfo(
- stageCompleted.stageInfo.stage.id,
- "STAGE_ID=%d STATUS=COMPLETED".format(stageCompleted.stageInfo.stage.id))
-
- }
-
- override def onTaskStart(taskStart: SparkListenerTaskStart) { }
-
- override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- val task = taskEnd.task
- val taskInfo = taskEnd.taskInfo
- var taskStatus = ""
- task match {
- case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
- case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
- }
- taskEnd.reason match {
- case Success => taskStatus += " STATUS=SUCCESS"
- recordTaskMetrics(task.stageId, taskStatus, taskInfo, taskEnd.taskMetrics)
- case Resubmitted =>
- taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId +
- " STAGE_ID=" + task.stageId
- stageLogInfo(task.stageId, taskStatus)
- case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
- taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" +
- task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
- mapId + " REDUCE_ID=" + reduceId
- stageLogInfo(task.stageId, taskStatus)
- case OtherFailure(message) =>
- taskStatus += " STATUS=FAILURE TID=" + taskInfo.taskId +
- " STAGE_ID=" + task.stageId + " INFO=" + message
- stageLogInfo(task.stageId, taskStatus)
- case _ =>
- }
- }
-
- override def onJobEnd(jobEnd: SparkListenerJobEnd) {
- val job = jobEnd.job
- var info = "JOB_ID=" + job.jobId
- jobEnd.jobResult match {
- case JobSucceeded => info += " STATUS=SUCCESS"
- case JobFailed(exception, _) =>
- info += " STATUS=FAILED REASON="
- exception.getMessage.split("\\s+").foreach(info += _ + "_")
- case _ =>
- }
- jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase)
- closeLogWriter(job.jobId)
- }
-
- protected def recordJobProperties(jobID: Int, properties: Properties) {
- if(properties != null) {
- val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
- jobLogInfo(jobID, description, false)
- }
- }
-
- override def onJobStart(jobStart: SparkListenerJobStart) {
- val job = jobStart.job
- val properties = jobStart.properties
- createLogWriter(job.jobId)
- recordJobProperties(job.jobId, properties)
- buildJobDep(job.jobId, job.finalStage)
- recordStageDep(job.jobId)
- recordStageDepGraph(job.jobId, job.finalStage)
- jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/JobResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/JobResult.scala b/core/src/main/scala/spark/scheduler/JobResult.scala
deleted file mode 100644
index a61b335..0000000
--- a/core/src/main/scala/spark/scheduler/JobResult.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-/**
- * A result of a job in the DAGScheduler.
- */
-private[spark] sealed trait JobResult
-
-private[spark] case object JobSucceeded extends JobResult
-private[spark] case class JobFailed(exception: Exception, failedStage: Option[Stage]) extends JobResult
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/JobWaiter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/JobWaiter.scala b/core/src/main/scala/spark/scheduler/JobWaiter.scala
deleted file mode 100644
index 69cd161..0000000
--- a/core/src/main/scala/spark/scheduler/JobWaiter.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their
- * results to the given handler function.
- */
-private[spark] class JobWaiter[T](totalTasks: Int, resultHandler: (Int, T) => Unit)
- extends JobListener {
-
- private var finishedTasks = 0
-
- private var jobFinished = false // Is the job as a whole finished (succeeded or failed)?
- private var jobResult: JobResult = null // If the job is finished, this will be its result
-
- override def taskSucceeded(index: Int, result: Any) {
- synchronized {
- if (jobFinished) {
- throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
- }
- resultHandler(index, result.asInstanceOf[T])
- finishedTasks += 1
- if (finishedTasks == totalTasks) {
- jobFinished = true
- jobResult = JobSucceeded
- this.notifyAll()
- }
- }
- }
-
- override def jobFailed(exception: Exception) {
- synchronized {
- if (jobFinished) {
- throw new UnsupportedOperationException("jobFailed() called on a finished JobWaiter")
- }
- jobFinished = true
- jobResult = JobFailed(exception, None)
- this.notifyAll()
- }
- }
-
- def awaitResult(): JobResult = synchronized {
- while (!jobFinished) {
- this.wait()
- }
- return jobResult
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/MapStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/MapStatus.scala b/core/src/main/scala/spark/scheduler/MapStatus.scala
deleted file mode 100644
index 2f6a68e..0000000
--- a/core/src/main/scala/spark/scheduler/MapStatus.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import spark.storage.BlockManagerId
-import java.io.{ObjectOutput, ObjectInput, Externalizable}
-
-/**
- * Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
- * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
- * The map output sizes are compressed using MapOutputTracker.compressSize.
- */
-private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
- extends Externalizable {
-
- def this() = this(null, null) // For deserialization only
-
- def writeExternal(out: ObjectOutput) {
- location.writeExternal(out)
- out.writeInt(compressedSizes.length)
- out.write(compressedSizes)
- }
-
- def readExternal(in: ObjectInput) {
- location = BlockManagerId(in)
- compressedSizes = new Array[Byte](in.readInt())
- in.readFully(compressedSizes)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
deleted file mode 100644
index d066df5..0000000
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import spark._
-import java.io._
-import util.{MetadataCleaner, TimeStampedHashMap}
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-private[spark] object ResultTask {
-
- // A simple map between the stage id to the serialized byte array of a task.
- // Served as a cache for task serialization because serialization can be
- // expensive on the master node if it needs to launch thousands of tasks.
- val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
-
- val metadataCleaner = new MetadataCleaner("ResultTask", serializedInfoCache.clearOldValues)
-
- def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = {
- synchronized {
- val old = serializedInfoCache.get(stageId).orNull
- if (old != null) {
- return old
- } else {
- val out = new ByteArrayOutputStream
- val ser = SparkEnv.get.closureSerializer.newInstance
- val objOut = ser.serializeStream(new GZIPOutputStream(out))
- objOut.writeObject(rdd)
- objOut.writeObject(func)
- objOut.close()
- val bytes = out.toByteArray
- serializedInfoCache.put(stageId, bytes)
- return bytes
- }
- }
- }
-
- def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) => _) = {
- val loader = Thread.currentThread.getContextClassLoader
- val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
- val ser = SparkEnv.get.closureSerializer.newInstance
- val objIn = ser.deserializeStream(in)
- val rdd = objIn.readObject().asInstanceOf[RDD[_]]
- val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) => _]
- return (rdd, func)
- }
-
- def clearCache() {
- synchronized {
- serializedInfoCache.clear()
- }
- }
-}
-
-
-private[spark] class ResultTask[T, U](
- stageId: Int,
- var rdd: RDD[T],
- var func: (TaskContext, Iterator[T]) => U,
- var partition: Int,
- @transient locs: Seq[TaskLocation],
- val outputId: Int)
- extends Task[U](stageId) with Externalizable {
-
- def this() = this(0, null, null, 0, null, 0)
-
- var split = if (rdd == null) {
- null
- } else {
- rdd.partitions(partition)
- }
-
- @transient private val preferredLocs: Seq[TaskLocation] = {
- if (locs == null) Nil else locs.toSet.toSeq
- }
-
- override def run(attemptId: Long): U = {
- val context = new TaskContext(stageId, partition, attemptId)
- metrics = Some(context.taskMetrics)
- try {
- func(context, rdd.iterator(split, context))
- } finally {
- context.executeOnCompleteCallbacks()
- }
- }
-
- override def preferredLocations: Seq[TaskLocation] = preferredLocs
-
- override def toString = "ResultTask(" + stageId + ", " + partition + ")"
-
- override def writeExternal(out: ObjectOutput) {
- RDDCheckpointData.synchronized {
- split = rdd.partitions(partition)
- out.writeInt(stageId)
- val bytes = ResultTask.serializeInfo(
- stageId, rdd, func.asInstanceOf[(TaskContext, Iterator[_]) => _])
- out.writeInt(bytes.length)
- out.write(bytes)
- out.writeInt(partition)
- out.writeInt(outputId)
- out.writeLong(epoch)
- out.writeObject(split)
- }
- }
-
- override def readExternal(in: ObjectInput) {
- val stageId = in.readInt()
- val numBytes = in.readInt()
- val bytes = new Array[Byte](numBytes)
- in.readFully(bytes)
- val (rdd_, func_) = ResultTask.deserializeInfo(stageId, bytes)
- rdd = rdd_.asInstanceOf[RDD[T]]
- func = func_.asInstanceOf[(TaskContext, Iterator[T]) => U]
- partition = in.readInt()
- val outputId = in.readInt()
- epoch = in.readLong()
- split = in.readObject().asInstanceOf[Partition]
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
deleted file mode 100644
index f2a0385..0000000
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-import scala.collection.mutable.HashMap
-
-import spark._
-import spark.executor.ShuffleWriteMetrics
-import spark.storage._
-import spark.util.{TimeStampedHashMap, MetadataCleaner}
-
-
-private[spark] object ShuffleMapTask {
-
- // A simple map between the stage id to the serialized byte array of a task.
- // Served as a cache for task serialization because serialization can be
- // expensive on the master node if it needs to launch thousands of tasks.
- val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
-
- val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.clearOldValues)
-
- def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = {
- synchronized {
- val old = serializedInfoCache.get(stageId).orNull
- if (old != null) {
- return old
- } else {
- val out = new ByteArrayOutputStream
- val ser = SparkEnv.get.closureSerializer.newInstance()
- val objOut = ser.serializeStream(new GZIPOutputStream(out))
- objOut.writeObject(rdd)
- objOut.writeObject(dep)
- objOut.close()
- val bytes = out.toByteArray
- serializedInfoCache.put(stageId, bytes)
- return bytes
- }
- }
- }
-
- def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_]) = {
- synchronized {
- val loader = Thread.currentThread.getContextClassLoader
- val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
- val ser = SparkEnv.get.closureSerializer.newInstance()
- val objIn = ser.deserializeStream(in)
- val rdd = objIn.readObject().asInstanceOf[RDD[_]]
- val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]]
- return (rdd, dep)
- }
- }
-
- // Since both the JarSet and FileSet have the same format this is used for both.
- def deserializeFileSet(bytes: Array[Byte]) : HashMap[String, Long] = {
- val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
- val objIn = new ObjectInputStream(in)
- val set = objIn.readObject().asInstanceOf[Array[(String, Long)]].toMap
- return (HashMap(set.toSeq: _*))
- }
-
- def clearCache() {
- synchronized {
- serializedInfoCache.clear()
- }
- }
-}
-
-private[spark] class ShuffleMapTask(
- stageId: Int,
- var rdd: RDD[_],
- var dep: ShuffleDependency[_,_],
- var partition: Int,
- @transient private var locs: Seq[TaskLocation])
- extends Task[MapStatus](stageId)
- with Externalizable
- with Logging {
-
- protected def this() = this(0, null, null, 0, null)
-
- @transient private val preferredLocs: Seq[TaskLocation] = {
- if (locs == null) Nil else locs.toSet.toSeq
- }
-
- var split = if (rdd == null) null else rdd.partitions(partition)
-
- override def writeExternal(out: ObjectOutput) {
- RDDCheckpointData.synchronized {
- split = rdd.partitions(partition)
- out.writeInt(stageId)
- val bytes = ShuffleMapTask.serializeInfo(stageId, rdd, dep)
- out.writeInt(bytes.length)
- out.write(bytes)
- out.writeInt(partition)
- out.writeLong(epoch)
- out.writeObject(split)
- }
- }
-
- override def readExternal(in: ObjectInput) {
- val stageId = in.readInt()
- val numBytes = in.readInt()
- val bytes = new Array[Byte](numBytes)
- in.readFully(bytes)
- val (rdd_, dep_) = ShuffleMapTask.deserializeInfo(stageId, bytes)
- rdd = rdd_
- dep = dep_
- partition = in.readInt()
- epoch = in.readLong()
- split = in.readObject().asInstanceOf[Partition]
- }
-
- override def run(attemptId: Long): MapStatus = {
- val numOutputSplits = dep.partitioner.numPartitions
-
- val taskContext = new TaskContext(stageId, partition, attemptId)
- metrics = Some(taskContext.taskMetrics)
-
- val blockManager = SparkEnv.get.blockManager
- var shuffle: ShuffleBlocks = null
- var buckets: ShuffleWriterGroup = null
-
- try {
- // Obtain all the block writers for shuffle blocks.
- val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
- shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)
- buckets = shuffle.acquireWriters(partition)
-
- // Write the map output to its associated buckets.
- for (elem <- rdd.iterator(split, taskContext)) {
- val pair = elem.asInstanceOf[Product2[Any, Any]]
- val bucketId = dep.partitioner.getPartition(pair._1)
- buckets.writers(bucketId).write(pair)
- }
-
- // Commit the writes. Get the size of each bucket block (total block size).
- var totalBytes = 0L
- val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter =>
- writer.commit()
- writer.close()
- val size = writer.size()
- totalBytes += size
- MapOutputTracker.compressSize(size)
- }
-
- // Update shuffle metrics.
- val shuffleMetrics = new ShuffleWriteMetrics
- shuffleMetrics.shuffleBytesWritten = totalBytes
- metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
-
- return new MapStatus(blockManager.blockManagerId, compressedSizes)
- } catch { case e: Exception =>
- // If there is an exception from running the task, revert the partial writes
- // and throw the exception upstream to Spark.
- if (buckets != null) {
- buckets.writers.foreach(_.revertPartialWrites())
- }
- throw e
- } finally {
- // Release the writers back to the shuffle block manager.
- if (shuffle != null && buckets != null) {
- shuffle.releaseWriters(buckets)
- }
- // Execute the callbacks on task completion.
- taskContext.executeOnCompleteCallbacks()
- }
- }
-
- override def preferredLocations: Seq[TaskLocation] = preferredLocs
-
- override def toString = "ShuffleMapTask(%d, %d)".format(stageId, partition)
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala
deleted file mode 100644
index e553101..0000000
--- a/core/src/main/scala/spark/scheduler/SparkListener.scala
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import java.util.Properties
-import spark.scheduler.cluster.TaskInfo
-import spark.util.Distribution
-import spark.{Logging, SparkContext, TaskEndReason, Utils}
-import spark.executor.TaskMetrics
-
-sealed trait SparkListenerEvents
-
-case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties)
- extends SparkListenerEvents
-
-case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
-
-case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
-
-case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
- taskMetrics: TaskMetrics) extends SparkListenerEvents
-
-case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null)
- extends SparkListenerEvents
-
-case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
- extends SparkListenerEvents
-
-trait SparkListener {
- /**
- * Called when a stage is completed, with information on the completed stage
- */
- def onStageCompleted(stageCompleted: StageCompleted) { }
-
- /**
- * Called when a stage is submitted
- */
- def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
-
- /**
- * Called when a task starts
- */
- def onTaskStart(taskEnd: SparkListenerTaskStart) { }
-
- /**
- * Called when a task ends
- */
- def onTaskEnd(taskEnd: SparkListenerTaskEnd) { }
-
- /**
- * Called when a job starts
- */
- def onJobStart(jobStart: SparkListenerJobStart) { }
-
- /**
- * Called when a job ends
- */
- def onJobEnd(jobEnd: SparkListenerJobEnd) { }
-
-}
-
-/**
- * Simple SparkListener that logs a few summary statistics when each stage completes
- */
-class StatsReportListener extends SparkListener with Logging {
- override def onStageCompleted(stageCompleted: StageCompleted) {
- import spark.scheduler.StatsReportListener._
- implicit val sc = stageCompleted
- this.logInfo("Finished stage: " + stageCompleted.stageInfo)
- showMillisDistribution("task runtime:", (info, _) => Some(info.duration))
-
- //shuffle write
- showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten})
-
- //fetch & io
- showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime})
- showBytesDistribution("remote bytes read:", (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead})
- showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize))
-
- //runtime breakdown
-
- val runtimePcts = stageCompleted.stageInfo.taskInfos.map{
- case (info, metrics) => RuntimePercentage(info.duration, metrics)
- }
- showDistribution("executor (non-fetch) time pct: ", Distribution(runtimePcts.map{_.executorPct * 100}), "%2.0f %%")
- showDistribution("fetch wait time pct: ", Distribution(runtimePcts.flatMap{_.fetchPct.map{_ * 100}}), "%2.0f %%")
- showDistribution("other time pct: ", Distribution(runtimePcts.map{_.other * 100}), "%2.0f %%")
- }
-
-}
-
-object StatsReportListener extends Logging {
-
- //for profiling, the extremes are more interesting
- val percentiles = Array[Int](0,5,10,25,50,75,90,95,100)
- val probabilities = percentiles.map{_ / 100.0}
- val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
-
- def extractDoubleDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Double]): Option[Distribution] = {
- Distribution(stage.stageInfo.taskInfos.flatMap{
- case ((info,metric)) => getMetric(info, metric)})
- }
-
- //is there some way to setup the types that I can get rid of this completely?
- def extractLongDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Long]): Option[Distribution] = {
- extractDoubleDistribution(stage, (info, metric) => getMetric(info,metric).map{_.toDouble})
- }
-
- def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
- val stats = d.statCounter
- logInfo(heading + stats)
- val quantiles = d.getQuantiles(probabilities).map{formatNumber}
- logInfo(percentilesHeader)
- logInfo("\t" + quantiles.mkString("\t"))
- }
-
- def showDistribution(heading: String, dOpt: Option[Distribution], formatNumber: Double => String) {
- dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
- }
-
- def showDistribution(heading: String, dOpt: Option[Distribution], format:String) {
- def f(d:Double) = format.format(d)
- showDistribution(heading, dOpt, f _)
- }
-
- def showDistribution(heading:String, format: String, getMetric: (TaskInfo,TaskMetrics) => Option[Double])
- (implicit stage: StageCompleted) {
- showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
- }
-
- def showBytesDistribution(heading:String, getMetric: (TaskInfo,TaskMetrics) => Option[Long])
- (implicit stage: StageCompleted) {
- showBytesDistribution(heading, extractLongDistribution(stage, getMetric))
- }
-
- def showBytesDistribution(heading: String, dOpt: Option[Distribution]) {
- dOpt.foreach{dist => showBytesDistribution(heading, dist)}
- }
-
- def showBytesDistribution(heading: String, dist: Distribution) {
- showDistribution(heading, dist, (d => Utils.bytesToString(d.toLong)): Double => String)
- }
-
- def showMillisDistribution(heading: String, dOpt: Option[Distribution]) {
- showDistribution(heading, dOpt, (d => StatsReportListener.millisToString(d.toLong)): Double => String)
- }
-
- def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long])
- (implicit stage: StageCompleted) {
- showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
- }
-
-
-
- val seconds = 1000L
- val minutes = seconds * 60
- val hours = minutes * 60
-
- /**
- * reformat a time interval in milliseconds to a prettier format for output
- */
- def millisToString(ms: Long) = {
- val (size, units) =
- if (ms > hours) {
- (ms.toDouble / hours, "hours")
- } else if (ms > minutes) {
- (ms.toDouble / minutes, "min")
- } else if (ms > seconds) {
- (ms.toDouble / seconds, "s")
- } else {
- (ms.toDouble, "ms")
- }
- "%.1f %s".format(size, units)
- }
-}
-
-
-
-case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
-object RuntimePercentage {
- def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
- val denom = totalTime.toDouble
- val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime}
- val fetch = fetchTime.map{_ / denom}
- val exec = (metrics.executorRunTime - fetchTime.getOrElse(0l)) / denom
- val other = 1.0 - (exec + fetch.getOrElse(0d))
- RuntimePercentage(exec, fetch, other)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/spark/scheduler/SparkListenerBus.scala
deleted file mode 100644
index f55ed45..0000000
--- a/core/src/main/scala/spark/scheduler/SparkListenerBus.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
-
-import spark.Logging
-
-/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
-private[spark] class SparkListenerBus() extends Logging {
- private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener]
-
- /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
- * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
- private val EVENT_QUEUE_CAPACITY = 10000
- private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
- private var queueFullErrorMessageLogged = false
-
- new Thread("SparkListenerBus") {
- setDaemon(true)
- override def run() {
- while (true) {
- val event = eventQueue.take
- event match {
- case stageSubmitted: SparkListenerStageSubmitted =>
- sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
- case stageCompleted: StageCompleted =>
- sparkListeners.foreach(_.onStageCompleted(stageCompleted))
- case jobStart: SparkListenerJobStart =>
- sparkListeners.foreach(_.onJobStart(jobStart))
- case jobEnd: SparkListenerJobEnd =>
- sparkListeners.foreach(_.onJobEnd(jobEnd))
- case taskStart: SparkListenerTaskStart =>
- sparkListeners.foreach(_.onTaskStart(taskStart))
- case taskEnd: SparkListenerTaskEnd =>
- sparkListeners.foreach(_.onTaskEnd(taskEnd))
- case _ =>
- }
- }
- }
- }.start()
-
- def addListener(listener: SparkListener) {
- sparkListeners += listener
- }
-
- def post(event: SparkListenerEvents) {
- val eventAdded = eventQueue.offer(event)
- if (!eventAdded && !queueFullErrorMessageLogged) {
- logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
- "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
- "rate at which tasks are being started by the scheduler.")
- queueFullErrorMessageLogged = true
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/SplitInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/SplitInfo.scala b/core/src/main/scala/spark/scheduler/SplitInfo.scala
deleted file mode 100644
index 4e3661e..0000000
--- a/core/src/main/scala/spark/scheduler/SplitInfo.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import collection.mutable.ArrayBuffer
-
-// information about a specific split instance : handles both split instances.
-// So that we do not need to worry about the differences.
-class SplitInfo(val inputFormatClazz: Class[_], val hostLocation: String, val path: String,
- val length: Long, val underlyingSplit: Any) {
- override def toString(): String = {
- "SplitInfo " + super.toString + " .. inputFormatClazz " + inputFormatClazz +
- ", hostLocation : " + hostLocation + ", path : " + path +
- ", length : " + length + ", underlyingSplit " + underlyingSplit
- }
-
- override def hashCode(): Int = {
- var hashCode = inputFormatClazz.hashCode
- hashCode = hashCode * 31 + hostLocation.hashCode
- hashCode = hashCode * 31 + path.hashCode
- // ignore overflow ? It is hashcode anyway !
- hashCode = hashCode * 31 + (length & 0x7fffffff).toInt
- hashCode
- }
-
- // This is practically useless since most of the Split impl's dont seem to implement equals :-(
- // So unless there is identity equality between underlyingSplits, it will always fail even if it
- // is pointing to same block.
- override def equals(other: Any): Boolean = other match {
- case that: SplitInfo => {
- this.hostLocation == that.hostLocation &&
- this.inputFormatClazz == that.inputFormatClazz &&
- this.path == that.path &&
- this.length == that.length &&
- // other split specific checks (like start for FileSplit)
- this.underlyingSplit == that.underlyingSplit
- }
- case _ => false
- }
-}
-
-object SplitInfo {
-
- def toSplitInfo(inputFormatClazz: Class[_], path: String,
- mapredSplit: org.apache.hadoop.mapred.InputSplit): Seq[SplitInfo] = {
- val retval = new ArrayBuffer[SplitInfo]()
- val length = mapredSplit.getLength
- for (host <- mapredSplit.getLocations) {
- retval += new SplitInfo(inputFormatClazz, host, path, length, mapredSplit)
- }
- retval
- }
-
- def toSplitInfo(inputFormatClazz: Class[_], path: String,
- mapreduceSplit: org.apache.hadoop.mapreduce.InputSplit): Seq[SplitInfo] = {
- val retval = new ArrayBuffer[SplitInfo]()
- val length = mapreduceSplit.getLength
- for (host <- mapreduceSplit.getLocations) {
- retval += new SplitInfo(inputFormatClazz, host, path, length, mapreduceSplit)
- }
- retval
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala
deleted file mode 100644
index c599c00..0000000
--- a/core/src/main/scala/spark/scheduler/Stage.scala
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import java.net.URI
-
-import spark._
-import spark.storage.BlockManagerId
-
-/**
- * A stage is a set of independent tasks all computing the same function that need to run as part
- * of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
- * by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
- * DAGScheduler runs these stages in topological order.
- *
- * Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
- * another stage, or a result stage, in which case its tasks directly compute the action that
- * initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
- * that each output partition is on.
- *
- * Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO
- * scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
- * faster on failure.
- */
-private[spark] class Stage(
- val id: Int,
- val rdd: RDD[_],
- val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage
- val parents: List[Stage],
- val jobId: Int,
- callSite: Option[String])
- extends Logging {
-
- val isShuffleMap = shuffleDep != None
- val numPartitions = rdd.partitions.size
- val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
- var numAvailableOutputs = 0
-
- /** When first task was submitted to scheduler. */
- var submissionTime: Option[Long] = None
- var completionTime: Option[Long] = None
-
- private var nextAttemptId = 0
-
- def isAvailable: Boolean = {
- if (!isShuffleMap) {
- true
- } else {
- numAvailableOutputs == numPartitions
- }
- }
-
- def addOutputLoc(partition: Int, status: MapStatus) {
- val prevList = outputLocs(partition)
- outputLocs(partition) = status :: prevList
- if (prevList == Nil)
- numAvailableOutputs += 1
- }
-
- def removeOutputLoc(partition: Int, bmAddress: BlockManagerId) {
- val prevList = outputLocs(partition)
- val newList = prevList.filterNot(_.location == bmAddress)
- outputLocs(partition) = newList
- if (prevList != Nil && newList == Nil) {
- numAvailableOutputs -= 1
- }
- }
-
- def removeOutputsOnExecutor(execId: String) {
- var becameUnavailable = false
- for (partition <- 0 until numPartitions) {
- val prevList = outputLocs(partition)
- val newList = prevList.filterNot(_.location.executorId == execId)
- outputLocs(partition) = newList
- if (prevList != Nil && newList == Nil) {
- becameUnavailable = true
- numAvailableOutputs -= 1
- }
- }
- if (becameUnavailable) {
- logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
- this, execId, numAvailableOutputs, numPartitions, isAvailable))
- }
- }
-
- def newAttemptId(): Int = {
- val id = nextAttemptId
- nextAttemptId += 1
- return id
- }
-
- val name = callSite.getOrElse(rdd.origin)
-
- override def toString = "Stage " + id
-
- override def hashCode(): Int = id
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/StageInfo.scala b/core/src/main/scala/spark/scheduler/StageInfo.scala
deleted file mode 100644
index c4026f9..0000000
--- a/core/src/main/scala/spark/scheduler/StageInfo.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import spark.scheduler.cluster.TaskInfo
-import scala.collection._
-import spark.executor.TaskMetrics
-
-case class StageInfo(
- val stage: Stage,
- val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] = mutable.Buffer[(TaskInfo, TaskMetrics)]()
-) {
- override def toString = stage.rdd.toString
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/Task.scala b/core/src/main/scala/spark/scheduler/Task.scala
deleted file mode 100644
index 0ab2ae6..0000000
--- a/core/src/main/scala/spark/scheduler/Task.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import spark.serializer.SerializerInstance
-import java.io.{DataInputStream, DataOutputStream}
-import java.nio.ByteBuffer
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
-import spark.util.ByteBufferInputStream
-import scala.collection.mutable.HashMap
-import spark.executor.TaskMetrics
-
-/**
- * A task to execute on a worker node.
- */
-private[spark] abstract class Task[T](val stageId: Int) extends Serializable {
- def run(attemptId: Long): T
- def preferredLocations: Seq[TaskLocation] = Nil
-
- var epoch: Long = -1 // Map output tracker epoch. Will be set by TaskScheduler.
-
- var metrics: Option[TaskMetrics] = None
-
-}
-
-/**
- * Handles transmission of tasks and their dependencies, because this can be slightly tricky. We
- * need to send the list of JARs and files added to the SparkContext with each task to ensure that
- * worker nodes find out about it, but we can't make it part of the Task because the user's code in
- * the task might depend on one of the JARs. Thus we serialize each task as multiple objects, by
- * first writing out its dependencies.
- */
-private[spark] object Task {
- /**
- * Serialize a task and the current app dependencies (files and JARs added to the SparkContext)
- */
- def serializeWithDependencies(
- task: Task[_],
- currentFiles: HashMap[String, Long],
- currentJars: HashMap[String, Long],
- serializer: SerializerInstance)
- : ByteBuffer = {
-
- val out = new FastByteArrayOutputStream(4096)
- val dataOut = new DataOutputStream(out)
-
- // Write currentFiles
- dataOut.writeInt(currentFiles.size)
- for ((name, timestamp) <- currentFiles) {
- dataOut.writeUTF(name)
- dataOut.writeLong(timestamp)
- }
-
- // Write currentJars
- dataOut.writeInt(currentJars.size)
- for ((name, timestamp) <- currentJars) {
- dataOut.writeUTF(name)
- dataOut.writeLong(timestamp)
- }
-
- // Write the task itself and finish
- dataOut.flush()
- val taskBytes = serializer.serialize(task).array()
- out.write(taskBytes)
- out.trim()
- ByteBuffer.wrap(out.array)
- }
-
- /**
- * Deserialize the list of dependencies in a task serialized with serializeWithDependencies,
- * and return the task itself as a serialized ByteBuffer. The caller can then update its
- * ClassLoaders and deserialize the task.
- *
- * @return (taskFiles, taskJars, taskBytes)
- */
- def deserializeWithDependencies(serializedTask: ByteBuffer)
- : (HashMap[String, Long], HashMap[String, Long], ByteBuffer) = {
-
- val in = new ByteBufferInputStream(serializedTask)
- val dataIn = new DataInputStream(in)
-
- // Read task's files
- val taskFiles = new HashMap[String, Long]()
- val numFiles = dataIn.readInt()
- for (i <- 0 until numFiles) {
- taskFiles(dataIn.readUTF()) = dataIn.readLong()
- }
-
- // Read task's JARs
- val taskJars = new HashMap[String, Long]()
- val numJars = dataIn.readInt()
- for (i <- 0 until numJars) {
- taskJars(dataIn.readUTF()) = dataIn.readLong()
- }
-
- // Create a sub-buffer for the rest of the data, which is the serialized Task object
- val subBuffer = serializedTask.slice() // ByteBufferInputStream will have read just up to task
- (taskFiles, taskJars, subBuffer)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/TaskLocation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/TaskLocation.scala b/core/src/main/scala/spark/scheduler/TaskLocation.scala
deleted file mode 100644
index fea117e..0000000
--- a/core/src/main/scala/spark/scheduler/TaskLocation.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-/**
- * A location where a task should run. This can either be a host or a (host, executorID) pair.
- * In the latter case, we will prefer to launch the task on that executorID, but our next level
- * of preference will be executors on the same host if this is not possible.
- */
-private[spark]
-class TaskLocation private (val host: String, val executorId: Option[String]) extends Serializable {
- override def toString: String = "TaskLocation(" + host + ", " + executorId + ")"
-}
-
-private[spark] object TaskLocation {
- def apply(host: String, executorId: String) = new TaskLocation(host, Some(executorId))
-
- def apply(host: String) = new TaskLocation(host, None)
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/TaskResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/TaskResult.scala b/core/src/main/scala/spark/scheduler/TaskResult.scala
deleted file mode 100644
index fc48567..0000000
--- a/core/src/main/scala/spark/scheduler/TaskResult.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import java.io._
-
-import scala.collection.mutable.Map
-import spark.executor.TaskMetrics
-import spark.{Utils, SparkEnv}
-import java.nio.ByteBuffer
-
-// Task result. Also contains updates to accumulator variables.
-// TODO: Use of distributed cache to return result is a hack to get around
-// what seems to be a bug with messages over 60KB in libprocess; fix it
-private[spark]
-class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
- extends Externalizable
-{
- def this() = this(null.asInstanceOf[T], null, null)
-
- override def writeExternal(out: ObjectOutput) {
-
- val objectSer = SparkEnv.get.serializer.newInstance()
- val bb = objectSer.serialize(value)
-
- out.writeInt(bb.remaining())
- Utils.writeByteBuffer(bb, out)
-
- out.writeInt(accumUpdates.size)
- for ((key, value) <- accumUpdates) {
- out.writeLong(key)
- out.writeObject(value)
- }
- out.writeObject(metrics)
- }
-
- override def readExternal(in: ObjectInput) {
-
- val objectSer = SparkEnv.get.serializer.newInstance()
-
- val blen = in.readInt()
- val byteVal = new Array[Byte](blen)
- in.readFully(byteVal)
- value = objectSer.deserialize(ByteBuffer.wrap(byteVal))
-
- val numUpdates = in.readInt
- if (numUpdates == 0) {
- accumUpdates = null
- } else {
- accumUpdates = Map()
- for (i <- 0 until numUpdates) {
- accumUpdates(in.readLong()) = in.readObject()
- }
- }
- metrics = in.readObject().asInstanceOf[TaskMetrics]
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/spark/scheduler/TaskScheduler.scala
deleted file mode 100644
index 4943d58..0000000
--- a/core/src/main/scala/spark/scheduler/TaskScheduler.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import spark.scheduler.cluster.Pool
-import spark.scheduler.cluster.SchedulingMode.SchedulingMode
-/**
- * Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
- * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
- * and are responsible for sending the tasks to the cluster, running them, retrying if there
- * are failures, and mitigating stragglers. They return events to the DAGScheduler through
- * the TaskSchedulerListener interface.
- */
-private[spark] trait TaskScheduler {
-
- def rootPool: Pool
-
- def schedulingMode: SchedulingMode
-
- def start(): Unit
-
- // Invoked after system has successfully initialized (typically in spark context).
- // Yarn uses this to bootstrap allocation of resources based on preferred locations, wait for slave registerations, etc.
- def postStartHook() { }
-
- // Disconnect from the cluster.
- def stop(): Unit
-
- // Submit a sequence of tasks to run.
- def submitTasks(taskSet: TaskSet): Unit
-
- // Set a listener for upcalls. This is guaranteed to be set before submitTasks is called.
- def setListener(listener: TaskSchedulerListener): Unit
-
- // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
- def defaultParallelism(): Int
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
deleted file mode 100644
index 64be50b..0000000
--- a/core/src/main/scala/spark/scheduler/TaskSchedulerListener.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import spark.scheduler.cluster.TaskInfo
-import scala.collection.mutable.Map
-
-import spark.TaskEndReason
-import spark.executor.TaskMetrics
-
-/**
- * Interface for getting events back from the TaskScheduler.
- */
-private[spark] trait TaskSchedulerListener {
- // A task has started.
- def taskStarted(task: Task[_], taskInfo: TaskInfo)
-
- // A task has finished or failed.
- def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any],
- taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit
-
- // A node was added to the cluster.
- def executorGained(execId: String, host: String): Unit
-
- // A node was lost from the cluster.
- def executorLost(execId: String): Unit
-
- // The TaskScheduler wants to abort an entire task set.
- def taskSetFailed(taskSet: TaskSet, reason: String): Unit
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/scheduler/TaskSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/scheduler/TaskSet.scala b/core/src/main/scala/spark/scheduler/TaskSet.scala
deleted file mode 100644
index dc3550d..0000000
--- a/core/src/main/scala/spark/scheduler/TaskSet.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.scheduler
-
-import java.util.Properties
-
-/**
- * A set of tasks submitted together to the low-level TaskScheduler, usually representing
- * missing partitions of a particular stage.
- */
-private[spark] class TaskSet(
- val tasks: Array[Task[_]],
- val stageId: Int,
- val attempt: Int,
- val priority: Int,
- val properties: Properties) {
- val id: String = stageId + "." + attempt
-
- override def toString: String = "TaskSet " + id
-}