You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:19 UTC
[35/69] [abbrv] [partial] Initial work to rename package to
org.apache.spark
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
new file mode 100644
index 0000000..5b07933
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.Properties
+
+import org.apache.spark.scheduler.cluster.TaskInfo
+import scala.collection.mutable.Map
+
+import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
+
+/**
+ * Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue
+ * architecture where any thread can post an event (e.g. a task finishing or a new job being
+ * submitted) but there is a single "logic" thread that reads these events and takes decisions.
+ * This greatly simplifies synchronization.
+ */
+private[spark] sealed trait DAGSchedulerEvent
+
+private[spark] case class JobSubmitted(
+ finalRDD: RDD[_],
+ func: (TaskContext, Iterator[_]) => _,
+ partitions: Array[Int],
+ allowLocal: Boolean,
+ callSite: String,
+ listener: JobListener,
+ properties: Properties = null)
+ extends DAGSchedulerEvent
+
+private[spark] case class BeginEvent(task: Task[_], taskInfo: TaskInfo) extends DAGSchedulerEvent
+
+private[spark] case class CompletionEvent(
+ task: Task[_],
+ reason: TaskEndReason,
+ result: Any,
+ accumUpdates: Map[Long, Any],
+ taskInfo: TaskInfo,
+ taskMetrics: TaskMetrics)
+ extends DAGSchedulerEvent
+
+private[spark] case class ExecutorGained(execId: String, host: String) extends DAGSchedulerEvent
+
+private[spark] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
+
+private[spark] case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent
+
+private[spark] case object StopDAGScheduler extends DAGSchedulerEvent
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
new file mode 100644
index 0000000..ce0dc90
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -0,0 +1,30 @@
+package org.apache.spark.scheduler
+
+import com.codahale.metrics.{Gauge,MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+
+private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
+ val metricRegistry = new MetricRegistry()
+ val sourceName = "DAGScheduler"
+
+ metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.failed.size
+ })
+
+ metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.running.size
+ })
+
+ metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.waiting.size
+ })
+
+ metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.nextJobId.get()
+ })
+
+ metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
+ override def getValue: Int = dagScheduler.activeJobs.size
+ })
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
new file mode 100644
index 0000000..370ccd1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.{Logging, SparkEnv}
+import scala.collection.immutable.Set
+import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.conf.Configuration
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.JavaConversions._
+
+
+/**
+ * Parses and holds information about inputFormat (and files) specified as a parameter.
+ */
+class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Class[_],
+ val path: String) extends Logging {
+
+ var mapreduceInputFormat: Boolean = false
+ var mapredInputFormat: Boolean = false
+
+ validate()
+
+ override def toString(): String = {
+ "InputFormatInfo " + super.toString + " .. inputFormatClazz " + inputFormatClazz + ", path : " + path
+ }
+
+ override def hashCode(): Int = {
+ var hashCode = inputFormatClazz.hashCode
+ hashCode = hashCode * 31 + path.hashCode
+ hashCode
+ }
+
+ // Since we are not doing canonicalization of path, this can be wrong : like relative vs absolute path
+ // .. which is fine, this is best case effort to remove duplicates - right ?
+ override def equals(other: Any): Boolean = other match {
+ case that: InputFormatInfo => {
+ // not checking config - that should be fine, right ?
+ this.inputFormatClazz == that.inputFormatClazz &&
+ this.path == that.path
+ }
+ case _ => false
+ }
+
+ private def validate() {
+ logDebug("validate InputFormatInfo : " + inputFormatClazz + ", path " + path)
+
+ try {
+ if (classOf[org.apache.hadoop.mapreduce.InputFormat[_, _]].isAssignableFrom(inputFormatClazz)) {
+ logDebug("inputformat is from mapreduce package")
+ mapreduceInputFormat = true
+ }
+ else if (classOf[org.apache.hadoop.mapred.InputFormat[_, _]].isAssignableFrom(inputFormatClazz)) {
+ logDebug("inputformat is from mapred package")
+ mapredInputFormat = true
+ }
+ else {
+ throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz +
+ " is NOT a supported input format ? does not implement either of the supported hadoop api's")
+ }
+ }
+ catch {
+ case e: ClassNotFoundException => {
+ throw new IllegalArgumentException("Specified inputformat " + inputFormatClazz + " cannot be found ?", e)
+ }
+ }
+ }
+
+
+ // This method does not expect failures, since validate has already passed ...
+ private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = {
+ val env = SparkEnv.get
+ val conf = new JobConf(configuration)
+ env.hadoop.addCredentials(conf)
+ FileInputFormat.setInputPaths(conf, path)
+
+ val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] =
+ ReflectionUtils.newInstance(inputFormatClazz.asInstanceOf[Class[_]], conf).asInstanceOf[
+ org.apache.hadoop.mapreduce.InputFormat[_, _]]
+ val job = new Job(conf)
+
+ val retval = new ArrayBuffer[SplitInfo]()
+ val list = instance.getSplits(job)
+ for (split <- list) {
+ retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, split)
+ }
+
+ return retval.toSet
+ }
+
+ // This method does not expect failures, since validate has already passed ...
+ private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = {
+ val env = SparkEnv.get
+ val jobConf = new JobConf(configuration)
+ env.hadoop.addCredentials(jobConf)
+ FileInputFormat.setInputPaths(jobConf, path)
+
+ val instance: org.apache.hadoop.mapred.InputFormat[_, _] =
+ ReflectionUtils.newInstance(inputFormatClazz.asInstanceOf[Class[_]], jobConf).asInstanceOf[
+ org.apache.hadoop.mapred.InputFormat[_, _]]
+
+ val retval = new ArrayBuffer[SplitInfo]()
+ instance.getSplits(jobConf, jobConf.getNumMapTasks()).foreach(
+ elem => retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, elem)
+ )
+
+ return retval.toSet
+ }
+
+ private def findPreferredLocations(): Set[SplitInfo] = {
+ logDebug("mapreduceInputFormat : " + mapreduceInputFormat + ", mapredInputFormat : " + mapredInputFormat +
+ ", inputFormatClazz : " + inputFormatClazz)
+ if (mapreduceInputFormat) {
+ return prefLocsFromMapreduceInputFormat()
+ }
+ else {
+ assert(mapredInputFormat)
+ return prefLocsFromMapredInputFormat()
+ }
+ }
+}
+
+
+
+
+object InputFormatInfo {
+ /**
+ Computes the preferred locations based on input(s) and returned a location to block map.
+ Typical use of this method for allocation would follow some algo like this
+ (which is what we currently do in YARN branch) :
+ a) For each host, count number of splits hosted on that host.
+ b) Decrement the currently allocated containers on that host.
+ c) Compute rack info for each host and update rack -> count map based on (b).
+ d) Allocate nodes based on (c)
+ e) On the allocation result, ensure that we dont allocate "too many" jobs on a single node
+ (even if data locality on that is very high) : this is to prevent fragility of job if a single
+ (or small set of) hosts go down.
+
+ go to (a) until required nodes are allocated.
+
+ If a node 'dies', follow same procedure.
+
+ PS: I know the wording here is weird, hopefully it makes some sense !
+ */
+ def computePreferredLocations(formats: Seq[InputFormatInfo]): HashMap[String, HashSet[SplitInfo]] = {
+
+ val nodeToSplit = new HashMap[String, HashSet[SplitInfo]]
+ for (inputSplit <- formats) {
+ val splits = inputSplit.findPreferredLocations()
+
+ for (split <- splits){
+ val location = split.hostLocation
+ val set = nodeToSplit.getOrElseUpdate(location, new HashSet[SplitInfo])
+ set += split
+ }
+ }
+
+ nodeToSplit
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/JobListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobListener.scala b/core/src/main/scala/org/apache/spark/scheduler/JobListener.scala
new file mode 100644
index 0000000..50c2b9a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobListener.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * Interface used to listen for job completion or failure events after submitting a job to the
+ * DAGScheduler. The listener is notified each time a task succeeds, as well as if the whole
+ * job fails (and no further taskSucceeded events will happen).
+ */
+private[spark] trait JobListener {
+ def taskSucceeded(index: Int, result: Any)
+ def jobFailed(exception: Exception)
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
new file mode 100644
index 0000000..98ef4d1
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.PrintWriter
+import java.io.File
+import java.io.FileNotFoundException
+import java.text.SimpleDateFormat
+import java.util.{Date, Properties}
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.mutable.{Map, HashMap, ListBuffer}
+import scala.io.Source
+
+import org.apache.spark._
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.scheduler.cluster.TaskInfo
+
+// Used to record runtime information for each job, including RDD graph
+// tasks' start/stop shuffle information and information from outside
+
+class JobLogger(val logDirName: String) extends SparkListener with Logging {
+ private val logDir =
+ if (System.getenv("SPARK_LOG_DIR") != null)
+ System.getenv("SPARK_LOG_DIR")
+ else
+ "/tmp/spark"
+ private val jobIDToPrintWriter = new HashMap[Int, PrintWriter]
+ private val stageIDToJobID = new HashMap[Int, Int]
+ private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]]
+ private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
+ private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents]
+
+ createLogDir()
+ def this() = this(String.valueOf(System.currentTimeMillis()))
+
+ def getLogDir = logDir
+ def getJobIDtoPrintWriter = jobIDToPrintWriter
+ def getStageIDToJobID = stageIDToJobID
+ def getJobIDToStages = jobIDToStages
+ def getEventQueue = eventQueue
+
+ // Create a folder for log files, the folder's name is the creation time of the jobLogger
+ protected def createLogDir() {
+ val dir = new File(logDir + "/" + logDirName + "/")
+ if (dir.exists()) {
+ return
+ }
+ if (dir.mkdirs() == false) {
+ logError("create log directory error:" + logDir + "/" + logDirName + "/")
+ }
+ }
+
+ // Create a log file for one job, the file name is the jobID
+ protected def createLogWriter(jobID: Int) {
+ try{
+ val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
+ jobIDToPrintWriter += (jobID -> fileWriter)
+ } catch {
+ case e: FileNotFoundException => e.printStackTrace()
+ }
+ }
+
+ // Close log file, and clean the stage relationship in stageIDToJobID
+ protected def closeLogWriter(jobID: Int) =
+ jobIDToPrintWriter.get(jobID).foreach { fileWriter =>
+ fileWriter.close()
+ jobIDToStages.get(jobID).foreach(_.foreach{ stage =>
+ stageIDToJobID -= stage.id
+ })
+ jobIDToPrintWriter -= jobID
+ jobIDToStages -= jobID
+ }
+
+ // Write log information to log file, withTime parameter controls whether to recored
+ // time stamp for the information
+ protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
+ var writeInfo = info
+ if (withTime) {
+ val date = new Date(System.currentTimeMillis())
+ writeInfo = DATE_FORMAT.format(date) + ": " +info
+ }
+ jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
+ }
+
+ protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) =
+ stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
+
+ protected def buildJobDep(jobID: Int, stage: Stage) {
+ if (stage.jobId == jobID) {
+ jobIDToStages.get(jobID) match {
+ case Some(stageList) => stageList += stage
+ case None => val stageList = new ListBuffer[Stage]
+ stageList += stage
+ jobIDToStages += (jobID -> stageList)
+ }
+ stageIDToJobID += (stage.id -> jobID)
+ stage.parents.foreach(buildJobDep(jobID, _))
+ }
+ }
+
+ protected def recordStageDep(jobID: Int) {
+ def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
+ var rddList = new ListBuffer[RDD[_]]
+ rddList += rdd
+ rdd.dependencies.foreach{ dep => dep match {
+ case shufDep: ShuffleDependency[_,_] =>
+ case _ => rddList ++= getRddsInStage(dep.rdd)
+ }
+ }
+ rddList
+ }
+ jobIDToStages.get(jobID).foreach {_.foreach { stage =>
+ var depRddDesc: String = ""
+ getRddsInStage(stage.rdd).foreach { rdd =>
+ depRddDesc += rdd.id + ","
+ }
+ var depStageDesc: String = ""
+ stage.parents.foreach { stage =>
+ depStageDesc += "(" + stage.id + "," + stage.shuffleDep.get.shuffleId + ")"
+ }
+ jobLogInfo(jobID, "STAGE_ID=" + stage.id + " RDD_DEP=(" +
+ depRddDesc.substring(0, depRddDesc.length - 1) + ")" +
+ " STAGE_DEP=" + depStageDesc, false)
+ }
+ }
+ }
+
+ // Generate indents and convert to String
+ protected def indentString(indent: Int) = {
+ val sb = new StringBuilder()
+ for (i <- 1 to indent) {
+ sb.append(" ")
+ }
+ sb.toString()
+ }
+
+ protected def getRddName(rdd: RDD[_]) = {
+ var rddName = rdd.getClass.getName
+ if (rdd.name != null) {
+ rddName = rdd.name
+ }
+ rddName
+ }
+
+ protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
+ val rddInfo = "RDD_ID=" + rdd.id + "(" + getRddName(rdd) + "," + rdd.generator + ")"
+ jobLogInfo(jobID, indentString(indent) + rddInfo, false)
+ rdd.dependencies.foreach{ dep => dep match {
+ case shufDep: ShuffleDependency[_,_] =>
+ val depInfo = "SHUFFLE_ID=" + shufDep.shuffleId
+ jobLogInfo(jobID, indentString(indent + 1) + depInfo, false)
+ case _ => recordRddInStageGraph(jobID, dep.rdd, indent + 1)
+ }
+ }
+ }
+
+ protected def recordStageDepGraph(jobID: Int, stage: Stage, indent: Int = 0) {
+ var stageInfo: String = ""
+ if (stage.isShuffleMap) {
+ stageInfo = "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" +
+ stage.shuffleDep.get.shuffleId
+ }else{
+ stageInfo = "STAGE_ID=" + stage.id + " RESULT_STAGE"
+ }
+ if (stage.jobId == jobID) {
+ jobLogInfo(jobID, indentString(indent) + stageInfo, false)
+ recordRddInStageGraph(jobID, stage.rdd, indent)
+ stage.parents.foreach(recordStageDepGraph(jobID, _, indent + 2))
+ } else
+ jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false)
+ }
+
+ // Record task metrics into job log files
+ protected def recordTaskMetrics(stageID: Int, status: String,
+ taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
+ val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID +
+ " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime +
+ " EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskMetrics.hostname
+ val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
+ val readMetrics =
+ taskMetrics.shuffleReadMetrics match {
+ case Some(metrics) =>
+ " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime +
+ " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched +
+ " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched +
+ " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched +
+ " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime +
+ " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime +
+ " REMOTE_BYTES_READ=" + metrics.remoteBytesRead
+ case None => ""
+ }
+ val writeMetrics =
+ taskMetrics.shuffleWriteMetrics match {
+ case Some(metrics) =>
+ " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
+ case None => ""
+ }
+ stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
+ }
+
+ override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
+ stageLogInfo(
+ stageSubmitted.stage.id,
+ "STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
+ stageSubmitted.stage.id, stageSubmitted.taskSize))
+ }
+
+ override def onStageCompleted(stageCompleted: StageCompleted) {
+ stageLogInfo(
+ stageCompleted.stageInfo.stage.id,
+ "STAGE_ID=%d STATUS=COMPLETED".format(stageCompleted.stageInfo.stage.id))
+
+ }
+
+ override def onTaskStart(taskStart: SparkListenerTaskStart) { }
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ val task = taskEnd.task
+ val taskInfo = taskEnd.taskInfo
+ var taskStatus = ""
+ task match {
+ case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
+ case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
+ }
+ taskEnd.reason match {
+ case Success => taskStatus += " STATUS=SUCCESS"
+ recordTaskMetrics(task.stageId, taskStatus, taskInfo, taskEnd.taskMetrics)
+ case Resubmitted =>
+ taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId +
+ " STAGE_ID=" + task.stageId
+ stageLogInfo(task.stageId, taskStatus)
+ case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
+ taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" +
+ task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
+ mapId + " REDUCE_ID=" + reduceId
+ stageLogInfo(task.stageId, taskStatus)
+ case OtherFailure(message) =>
+ taskStatus += " STATUS=FAILURE TID=" + taskInfo.taskId +
+ " STAGE_ID=" + task.stageId + " INFO=" + message
+ stageLogInfo(task.stageId, taskStatus)
+ case _ =>
+ }
+ }
+
+ override def onJobEnd(jobEnd: SparkListenerJobEnd) {
+ val job = jobEnd.job
+ var info = "JOB_ID=" + job.jobId
+ jobEnd.jobResult match {
+ case JobSucceeded => info += " STATUS=SUCCESS"
+ case JobFailed(exception, _) =>
+ info += " STATUS=FAILED REASON="
+ exception.getMessage.split("\\s+").foreach(info += _ + "_")
+ case _ =>
+ }
+ jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase)
+ closeLogWriter(job.jobId)
+ }
+
+ protected def recordJobProperties(jobID: Int, properties: Properties) {
+ if(properties != null) {
+ val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
+ jobLogInfo(jobID, description, false)
+ }
+ }
+
+ override def onJobStart(jobStart: SparkListenerJobStart) {
+ val job = jobStart.job
+ val properties = jobStart.properties
+ createLogWriter(job.jobId)
+ recordJobProperties(job.jobId, properties)
+ buildJobDep(job.jobId, job.finalStage)
+ recordStageDep(job.jobId)
+ recordStageDepGraph(job.jobId, job.finalStage)
+ jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
new file mode 100644
index 0000000..c381348
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * A result of a job in the DAGScheduler.
+ */
+private[spark] sealed trait JobResult
+
+private[spark] case object JobSucceeded extends JobResult
+private[spark] case class JobFailed(exception: Exception, failedStage: Option[Stage]) extends JobResult
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
new file mode 100644
index 0000000..200d881
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * An object that waits for a DAGScheduler job to complete. As tasks finish, it passes their
+ * results to the given handler function.
+ */
+private[spark] class JobWaiter[T](totalTasks: Int, resultHandler: (Int, T) => Unit)
+ extends JobListener {
+
+ private var finishedTasks = 0
+
+ private var jobFinished = false // Is the job as a whole finished (succeeded or failed)?
+ private var jobResult: JobResult = null // If the job is finished, this will be its result
+
+ override def taskSucceeded(index: Int, result: Any) {
+ synchronized {
+ if (jobFinished) {
+ throw new UnsupportedOperationException("taskSucceeded() called on a finished JobWaiter")
+ }
+ resultHandler(index, result.asInstanceOf[T])
+ finishedTasks += 1
+ if (finishedTasks == totalTasks) {
+ jobFinished = true
+ jobResult = JobSucceeded
+ this.notifyAll()
+ }
+ }
+ }
+
+ override def jobFailed(exception: Exception) {
+ synchronized {
+ if (jobFinished) {
+ throw new UnsupportedOperationException("jobFailed() called on a finished JobWaiter")
+ }
+ jobFinished = true
+ jobResult = JobFailed(exception, None)
+ this.notifyAll()
+ }
+ }
+
+ def awaitResult(): JobResult = synchronized {
+ while (!jobFinished) {
+ this.wait()
+ }
+ return jobResult
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
new file mode 100644
index 0000000..1c61687
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.storage.BlockManagerId
+import java.io.{ObjectOutput, ObjectInput, Externalizable}
+
+/**
+ * Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
+ * task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
+ * The map output sizes are compressed using MapOutputTracker.compressSize.
+ */
+private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
+ extends Externalizable {
+
+ def this() = this(null, null) // For deserialization only
+
+ def writeExternal(out: ObjectOutput) {
+ location.writeExternal(out)
+ out.writeInt(compressedSizes.length)
+ out.write(compressedSizes)
+ }
+
+ def readExternal(in: ObjectInput) {
+ location = BlockManagerId(in)
+ compressedSizes = new Array[Byte](in.readInt())
+ in.readFully(compressedSizes)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
new file mode 100644
index 0000000..2f157cc
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark._
+import java.io._
+import util.{MetadataCleaner, TimeStampedHashMap}
+import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+
+private[spark] object ResultTask {
+
+ // A simple map between the stage id to the serialized byte array of a task.
+ // Served as a cache for task serialization because serialization can be
+ // expensive on the master node if it needs to launch thousands of tasks.
+ val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
+
+ val metadataCleaner = new MetadataCleaner("ResultTask", serializedInfoCache.clearOldValues)
+
+ def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = {
+ synchronized {
+ val old = serializedInfoCache.get(stageId).orNull
+ if (old != null) {
+ return old
+ } else {
+ val out = new ByteArrayOutputStream
+ val ser = SparkEnv.get.closureSerializer.newInstance
+ val objOut = ser.serializeStream(new GZIPOutputStream(out))
+ objOut.writeObject(rdd)
+ objOut.writeObject(func)
+ objOut.close()
+ val bytes = out.toByteArray
+ serializedInfoCache.put(stageId, bytes)
+ return bytes
+ }
+ }
+ }
+
+ def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) => _) = {
+ val loader = Thread.currentThread.getContextClassLoader
+ val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+ val ser = SparkEnv.get.closureSerializer.newInstance
+ val objIn = ser.deserializeStream(in)
+ val rdd = objIn.readObject().asInstanceOf[RDD[_]]
+ val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) => _]
+ return (rdd, func)
+ }
+
+ def clearCache() {
+ synchronized {
+ serializedInfoCache.clear()
+ }
+ }
+}
+
+
+private[spark] class ResultTask[T, U](
+ stageId: Int,
+ var rdd: RDD[T],
+ var func: (TaskContext, Iterator[T]) => U,
+ var partition: Int,
+ @transient locs: Seq[TaskLocation],
+ val outputId: Int)
+ extends Task[U](stageId) with Externalizable {
+
+ def this() = this(0, null, null, 0, null, 0)
+
+ var split = if (rdd == null) {
+ null
+ } else {
+ rdd.partitions(partition)
+ }
+
+ @transient private val preferredLocs: Seq[TaskLocation] = {
+ if (locs == null) Nil else locs.toSet.toSeq
+ }
+
+ override def run(attemptId: Long): U = {
+ val context = new TaskContext(stageId, partition, attemptId)
+ metrics = Some(context.taskMetrics)
+ try {
+ func(context, rdd.iterator(split, context))
+ } finally {
+ context.executeOnCompleteCallbacks()
+ }
+ }
+
+ override def preferredLocations: Seq[TaskLocation] = preferredLocs
+
+ override def toString = "ResultTask(" + stageId + ", " + partition + ")"
+
+ override def writeExternal(out: ObjectOutput) {
+ RDDCheckpointData.synchronized {
+ split = rdd.partitions(partition)
+ out.writeInt(stageId)
+ val bytes = ResultTask.serializeInfo(
+ stageId, rdd, func.asInstanceOf[(TaskContext, Iterator[_]) => _])
+ out.writeInt(bytes.length)
+ out.write(bytes)
+ out.writeInt(partition)
+ out.writeInt(outputId)
+ out.writeLong(epoch)
+ out.writeObject(split)
+ }
+ }
+
+ override def readExternal(in: ObjectInput) {
+ val stageId = in.readInt()
+ val numBytes = in.readInt()
+ val bytes = new Array[Byte](numBytes)
+ in.readFully(bytes)
+ val (rdd_, func_) = ResultTask.deserializeInfo(stageId, bytes)
+ rdd = rdd_.asInstanceOf[RDD[T]]
+ func = func_.asInstanceOf[(TaskContext, Iterator[T]) => U]
+ partition = in.readInt()
+ val outputId = in.readInt()
+ epoch = in.readLong()
+ split = in.readObject().asInstanceOf[Partition]
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
new file mode 100644
index 0000000..ca716b4
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io._
+import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark._
+import org.apache.spark.executor.ShuffleWriteMetrics
+import org.apache.spark.storage._
+import org.apache.spark.util.{TimeStampedHashMap, MetadataCleaner}
+
+
+private[spark] object ShuffleMapTask {
+
+ // A simple map between the stage id to the serialized byte array of a task.
+ // Served as a cache for task serialization because serialization can be
+ // expensive on the master node if it needs to launch thousands of tasks.
+ val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
+
+ val metadataCleaner = new MetadataCleaner("ShuffleMapTask", serializedInfoCache.clearOldValues)
+
+ def serializeInfo(stageId: Int, rdd: RDD[_], dep: ShuffleDependency[_,_]): Array[Byte] = {
+ synchronized {
+ val old = serializedInfoCache.get(stageId).orNull
+ if (old != null) {
+ return old
+ } else {
+ val out = new ByteArrayOutputStream
+ val ser = SparkEnv.get.closureSerializer.newInstance()
+ val objOut = ser.serializeStream(new GZIPOutputStream(out))
+ objOut.writeObject(rdd)
+ objOut.writeObject(dep)
+ objOut.close()
+ val bytes = out.toByteArray
+ serializedInfoCache.put(stageId, bytes)
+ return bytes
+ }
+ }
+ }
+
+ def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], ShuffleDependency[_,_]) = {
+ synchronized {
+ val loader = Thread.currentThread.getContextClassLoader
+ val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+ val ser = SparkEnv.get.closureSerializer.newInstance()
+ val objIn = ser.deserializeStream(in)
+ val rdd = objIn.readObject().asInstanceOf[RDD[_]]
+ val dep = objIn.readObject().asInstanceOf[ShuffleDependency[_,_]]
+ return (rdd, dep)
+ }
+ }
+
+ // Since both the JarSet and FileSet have the same format this is used for both.
+ def deserializeFileSet(bytes: Array[Byte]) : HashMap[String, Long] = {
+ val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+ val objIn = new ObjectInputStream(in)
+ val set = objIn.readObject().asInstanceOf[Array[(String, Long)]].toMap
+ return (HashMap(set.toSeq: _*))
+ }
+
+ def clearCache() {
+ synchronized {
+ serializedInfoCache.clear()
+ }
+ }
+}
+
+private[spark] class ShuffleMapTask(
+ stageId: Int,
+ var rdd: RDD[_],
+ var dep: ShuffleDependency[_,_],
+ var partition: Int,
+ @transient private var locs: Seq[TaskLocation])
+ extends Task[MapStatus](stageId)
+ with Externalizable
+ with Logging {
+
+ protected def this() = this(0, null, null, 0, null)
+
+ @transient private val preferredLocs: Seq[TaskLocation] = {
+ if (locs == null) Nil else locs.toSet.toSeq
+ }
+
+ var split = if (rdd == null) null else rdd.partitions(partition)
+
+ override def writeExternal(out: ObjectOutput) {
+ RDDCheckpointData.synchronized {
+ split = rdd.partitions(partition)
+ out.writeInt(stageId)
+ val bytes = ShuffleMapTask.serializeInfo(stageId, rdd, dep)
+ out.writeInt(bytes.length)
+ out.write(bytes)
+ out.writeInt(partition)
+ out.writeLong(epoch)
+ out.writeObject(split)
+ }
+ }
+
+ override def readExternal(in: ObjectInput) {
+ val stageId = in.readInt()
+ val numBytes = in.readInt()
+ val bytes = new Array[Byte](numBytes)
+ in.readFully(bytes)
+ val (rdd_, dep_) = ShuffleMapTask.deserializeInfo(stageId, bytes)
+ rdd = rdd_
+ dep = dep_
+ partition = in.readInt()
+ epoch = in.readLong()
+ split = in.readObject().asInstanceOf[Partition]
+ }
+
+ override def run(attemptId: Long): MapStatus = {
+ val numOutputSplits = dep.partitioner.numPartitions
+
+ val taskContext = new TaskContext(stageId, partition, attemptId)
+ metrics = Some(taskContext.taskMetrics)
+
+ val blockManager = SparkEnv.get.blockManager
+ var shuffle: ShuffleBlocks = null
+ var buckets: ShuffleWriterGroup = null
+
+ try {
+ // Obtain all the block writers for shuffle blocks.
+ val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
+ shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)
+ buckets = shuffle.acquireWriters(partition)
+
+ // Write the map output to its associated buckets.
+ for (elem <- rdd.iterator(split, taskContext)) {
+ val pair = elem.asInstanceOf[Product2[Any, Any]]
+ val bucketId = dep.partitioner.getPartition(pair._1)
+ buckets.writers(bucketId).write(pair)
+ }
+
+ // Commit the writes. Get the size of each bucket block (total block size).
+ var totalBytes = 0L
+ val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter =>
+ writer.commit()
+ writer.close()
+ val size = writer.size()
+ totalBytes += size
+ MapOutputTracker.compressSize(size)
+ }
+
+ // Update shuffle metrics.
+ val shuffleMetrics = new ShuffleWriteMetrics
+ shuffleMetrics.shuffleBytesWritten = totalBytes
+ metrics.get.shuffleWriteMetrics = Some(shuffleMetrics)
+
+ return new MapStatus(blockManager.blockManagerId, compressedSizes)
+ } catch { case e: Exception =>
+ // If there is an exception from running the task, revert the partial writes
+ // and throw the exception upstream to Spark.
+ if (buckets != null) {
+ buckets.writers.foreach(_.revertPartialWrites())
+ }
+ throw e
+ } finally {
+ // Release the writers back to the shuffle block manager.
+ if (shuffle != null && buckets != null) {
+ shuffle.releaseWriters(buckets)
+ }
+ // Execute the callbacks on task completion.
+ taskContext.executeOnCompleteCallbacks()
+ }
+ }
+
+ override def preferredLocations: Seq[TaskLocation] = preferredLocs
+
+ override def toString = "ShuffleMapTask(%d, %d)".format(stageId, partition)
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
new file mode 100644
index 0000000..3504424
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.Properties
+import org.apache.spark.scheduler.cluster.TaskInfo
+import org.apache.spark.util.Distribution
+import org.apache.spark.{Logging, SparkContext, TaskEndReason, Utils}
+import org.apache.spark.executor.TaskMetrics
+
+sealed trait SparkListenerEvents
+
+case class SparkListenerStageSubmitted(stage: Stage, taskSize: Int, properties: Properties)
+ extends SparkListenerEvents
+
+case class StageCompleted(val stageInfo: StageInfo) extends SparkListenerEvents
+
+case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
+
+case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
+ taskMetrics: TaskMetrics) extends SparkListenerEvents
+
+case class SparkListenerJobStart(job: ActiveJob, properties: Properties = null)
+ extends SparkListenerEvents
+
+case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
+ extends SparkListenerEvents
+
+trait SparkListener {
+ /**
+ * Called when a stage is completed, with information on the completed stage
+ */
+ def onStageCompleted(stageCompleted: StageCompleted) { }
+
+ /**
+ * Called when a stage is submitted
+ */
+ def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { }
+
+ /**
+ * Called when a task starts
+ */
+ def onTaskStart(taskEnd: SparkListenerTaskStart) { }
+
+ /**
+ * Called when a task ends
+ */
+ def onTaskEnd(taskEnd: SparkListenerTaskEnd) { }
+
+ /**
+ * Called when a job starts
+ */
+ def onJobStart(jobStart: SparkListenerJobStart) { }
+
+ /**
+ * Called when a job ends
+ */
+ def onJobEnd(jobEnd: SparkListenerJobEnd) { }
+
+}
+
+/**
+ * Simple SparkListener that logs a few summary statistics when each stage completes
+ */
+class StatsReportListener extends SparkListener with Logging {
+ override def onStageCompleted(stageCompleted: StageCompleted) {
+ import org.apache.spark.scheduler.StatsReportListener._
+ implicit val sc = stageCompleted
+ this.logInfo("Finished stage: " + stageCompleted.stageInfo)
+ showMillisDistribution("task runtime:", (info, _) => Some(info.duration))
+
+ //shuffle write
+ showBytesDistribution("shuffle bytes written:",(_,metric) => metric.shuffleWriteMetrics.map{_.shuffleBytesWritten})
+
+ //fetch & io
+ showMillisDistribution("fetch wait time:",(_, metric) => metric.shuffleReadMetrics.map{_.fetchWaitTime})
+ showBytesDistribution("remote bytes read:", (_, metric) => metric.shuffleReadMetrics.map{_.remoteBytesRead})
+ showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize))
+
+ //runtime breakdown
+
+ val runtimePcts = stageCompleted.stageInfo.taskInfos.map{
+ case (info, metrics) => RuntimePercentage(info.duration, metrics)
+ }
+ showDistribution("executor (non-fetch) time pct: ", Distribution(runtimePcts.map{_.executorPct * 100}), "%2.0f %%")
+ showDistribution("fetch wait time pct: ", Distribution(runtimePcts.flatMap{_.fetchPct.map{_ * 100}}), "%2.0f %%")
+ showDistribution("other time pct: ", Distribution(runtimePcts.map{_.other * 100}), "%2.0f %%")
+ }
+
+}
+
+object StatsReportListener extends Logging {
+
+ //for profiling, the extremes are more interesting
+ val percentiles = Array[Int](0,5,10,25,50,75,90,95,100)
+ val probabilities = percentiles.map{_ / 100.0}
+ val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
+
+ def extractDoubleDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Double]): Option[Distribution] = {
+ Distribution(stage.stageInfo.taskInfos.flatMap{
+ case ((info,metric)) => getMetric(info, metric)})
+ }
+
+ //is there some way to setup the types that I can get rid of this completely?
+ def extractLongDistribution(stage:StageCompleted, getMetric: (TaskInfo,TaskMetrics) => Option[Long]): Option[Distribution] = {
+ extractDoubleDistribution(stage, (info, metric) => getMetric(info,metric).map{_.toDouble})
+ }
+
+ def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
+ val stats = d.statCounter
+ logInfo(heading + stats)
+ val quantiles = d.getQuantiles(probabilities).map{formatNumber}
+ logInfo(percentilesHeader)
+ logInfo("\t" + quantiles.mkString("\t"))
+ }
+
+ def showDistribution(heading: String, dOpt: Option[Distribution], formatNumber: Double => String) {
+ dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
+ }
+
+ def showDistribution(heading: String, dOpt: Option[Distribution], format:String) {
+ def f(d:Double) = format.format(d)
+ showDistribution(heading, dOpt, f _)
+ }
+
+ def showDistribution(heading:String, format: String, getMetric: (TaskInfo,TaskMetrics) => Option[Double])
+ (implicit stage: StageCompleted) {
+ showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
+ }
+
+ def showBytesDistribution(heading:String, getMetric: (TaskInfo,TaskMetrics) => Option[Long])
+ (implicit stage: StageCompleted) {
+ showBytesDistribution(heading, extractLongDistribution(stage, getMetric))
+ }
+
+ def showBytesDistribution(heading: String, dOpt: Option[Distribution]) {
+ dOpt.foreach{dist => showBytesDistribution(heading, dist)}
+ }
+
+ def showBytesDistribution(heading: String, dist: Distribution) {
+ showDistribution(heading, dist, (d => Utils.bytesToString(d.toLong)): Double => String)
+ }
+
+ def showMillisDistribution(heading: String, dOpt: Option[Distribution]) {
+ showDistribution(heading, dOpt, (d => StatsReportListener.millisToString(d.toLong)): Double => String)
+ }
+
+ def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long])
+ (implicit stage: StageCompleted) {
+ showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
+ }
+
+
+
+ val seconds = 1000L
+ val minutes = seconds * 60
+ val hours = minutes * 60
+
+ /**
+ * reformat a time interval in milliseconds to a prettier format for output
+ */
+ def millisToString(ms: Long) = {
+ val (size, units) =
+ if (ms > hours) {
+ (ms.toDouble / hours, "hours")
+ } else if (ms > minutes) {
+ (ms.toDouble / minutes, "min")
+ } else if (ms > seconds) {
+ (ms.toDouble / seconds, "s")
+ } else {
+ (ms.toDouble, "ms")
+ }
+ "%.1f %s".format(size, units)
+ }
+}
+
+
+
+case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
+object RuntimePercentage {
+ def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
+ val denom = totalTime.toDouble
+ val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime}
+ val fetch = fetchTime.map{_ / denom}
+ val exec = (metrics.executorRunTime - fetchTime.getOrElse(0l)) / denom
+ val other = 1.0 - (exec + fetch.getOrElse(0d))
+ RuntimePercentage(exec, fetch, other)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
new file mode 100644
index 0000000..a65e1ec
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
+
+import org.apache.spark.Logging
+
+/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
+private[spark] class SparkListenerBus() extends Logging {
+ private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener]
+
+ /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
+ * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
+ private val EVENT_QUEUE_CAPACITY = 10000
+ private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
+ private var queueFullErrorMessageLogged = false
+
+ new Thread("SparkListenerBus") {
+ setDaemon(true)
+ override def run() {
+ while (true) {
+ val event = eventQueue.take
+ event match {
+ case stageSubmitted: SparkListenerStageSubmitted =>
+ sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
+ case stageCompleted: StageCompleted =>
+ sparkListeners.foreach(_.onStageCompleted(stageCompleted))
+ case jobStart: SparkListenerJobStart =>
+ sparkListeners.foreach(_.onJobStart(jobStart))
+ case jobEnd: SparkListenerJobEnd =>
+ sparkListeners.foreach(_.onJobEnd(jobEnd))
+ case taskStart: SparkListenerTaskStart =>
+ sparkListeners.foreach(_.onTaskStart(taskStart))
+ case taskEnd: SparkListenerTaskEnd =>
+ sparkListeners.foreach(_.onTaskEnd(taskEnd))
+ case _ =>
+ }
+ }
+ }
+ }.start()
+
+ def addListener(listener: SparkListener) {
+ sparkListeners += listener
+ }
+
+ def post(event: SparkListenerEvents) {
+ val eventAdded = eventQueue.offer(event)
+ if (!eventAdded && !queueFullErrorMessageLogged) {
+ logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
+ "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
+ "rate at which tasks are being started by the scheduler.")
+ queueFullErrorMessageLogged = true
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala
new file mode 100644
index 0000000..5b40a3e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/SplitInfo.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import collection.mutable.ArrayBuffer
+
+// information about a specific split instance : handles both split instances.
+// So that we do not need to worry about the differences.
+class SplitInfo(val inputFormatClazz: Class[_], val hostLocation: String, val path: String,
+ val length: Long, val underlyingSplit: Any) {
+ override def toString(): String = {
+ "SplitInfo " + super.toString + " .. inputFormatClazz " + inputFormatClazz +
+ ", hostLocation : " + hostLocation + ", path : " + path +
+ ", length : " + length + ", underlyingSplit " + underlyingSplit
+ }
+
+ override def hashCode(): Int = {
+ var hashCode = inputFormatClazz.hashCode
+ hashCode = hashCode * 31 + hostLocation.hashCode
+ hashCode = hashCode * 31 + path.hashCode
+ // ignore overflow ? It is hashcode anyway !
+ hashCode = hashCode * 31 + (length & 0x7fffffff).toInt
+ hashCode
+ }
+
+ // This is practically useless since most of the Split impl's dont seem to implement equals :-(
+ // So unless there is identity equality between underlyingSplits, it will always fail even if it
+ // is pointing to same block.
+ override def equals(other: Any): Boolean = other match {
+ case that: SplitInfo => {
+ this.hostLocation == that.hostLocation &&
+ this.inputFormatClazz == that.inputFormatClazz &&
+ this.path == that.path &&
+ this.length == that.length &&
+ // other split specific checks (like start for FileSplit)
+ this.underlyingSplit == that.underlyingSplit
+ }
+ case _ => false
+ }
+}
+
+object SplitInfo {
+
+ def toSplitInfo(inputFormatClazz: Class[_], path: String,
+ mapredSplit: org.apache.hadoop.mapred.InputSplit): Seq[SplitInfo] = {
+ val retval = new ArrayBuffer[SplitInfo]()
+ val length = mapredSplit.getLength
+ for (host <- mapredSplit.getLocations) {
+ retval += new SplitInfo(inputFormatClazz, host, path, length, mapredSplit)
+ }
+ retval
+ }
+
+ def toSplitInfo(inputFormatClazz: Class[_], path: String,
+ mapreduceSplit: org.apache.hadoop.mapreduce.InputSplit): Seq[SplitInfo] = {
+ val retval = new ArrayBuffer[SplitInfo]()
+ val length = mapreduceSplit.getLength
+ for (host <- mapreduceSplit.getLocations) {
+ retval += new SplitInfo(inputFormatClazz, host, path, length, mapreduceSplit)
+ }
+ retval
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
new file mode 100644
index 0000000..87b1fe4
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.net.URI
+
+import org.apache.spark._
+import org.apache.spark.storage.BlockManagerId
+
+/**
+ * A stage is a set of independent tasks all computing the same function that need to run as part
+ * of a Spark job, where all the tasks have the same shuffle dependencies. Each DAG of tasks run
+ * by the scheduler is split up into stages at the boundaries where shuffle occurs, and then the
+ * DAGScheduler runs these stages in topological order.
+ *
+ * Each Stage can either be a shuffle map stage, in which case its tasks' results are input for
+ * another stage, or a result stage, in which case its tasks directly compute the action that
+ * initiated a job (e.g. count(), save(), etc). For shuffle map stages, we also track the nodes
+ * that each output partition is on.
+ *
+ * Each Stage also has a jobId, identifying the job that first submitted the stage. When FIFO
+ * scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
+ * faster on failure.
+ */
+private[spark] class Stage(
+ val id: Int,
+ val rdd: RDD[_],
+ val shuffleDep: Option[ShuffleDependency[_,_]], // Output shuffle if stage is a map stage
+ val parents: List[Stage],
+ val jobId: Int,
+ callSite: Option[String])
+ extends Logging {
+
+ val isShuffleMap = shuffleDep != None
+ val numPartitions = rdd.partitions.size
+ val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
+ var numAvailableOutputs = 0
+
+ /** When first task was submitted to scheduler. */
+ var submissionTime: Option[Long] = None
+ var completionTime: Option[Long] = None
+
+ private var nextAttemptId = 0
+
+ def isAvailable: Boolean = {
+ if (!isShuffleMap) {
+ true
+ } else {
+ numAvailableOutputs == numPartitions
+ }
+ }
+
+ def addOutputLoc(partition: Int, status: MapStatus) {
+ val prevList = outputLocs(partition)
+ outputLocs(partition) = status :: prevList
+ if (prevList == Nil)
+ numAvailableOutputs += 1
+ }
+
+ def removeOutputLoc(partition: Int, bmAddress: BlockManagerId) {
+ val prevList = outputLocs(partition)
+ val newList = prevList.filterNot(_.location == bmAddress)
+ outputLocs(partition) = newList
+ if (prevList != Nil && newList == Nil) {
+ numAvailableOutputs -= 1
+ }
+ }
+
+ def removeOutputsOnExecutor(execId: String) {
+ var becameUnavailable = false
+ for (partition <- 0 until numPartitions) {
+ val prevList = outputLocs(partition)
+ val newList = prevList.filterNot(_.location.executorId == execId)
+ outputLocs(partition) = newList
+ if (prevList != Nil && newList == Nil) {
+ becameUnavailable = true
+ numAvailableOutputs -= 1
+ }
+ }
+ if (becameUnavailable) {
+ logInfo("%s is now unavailable on executor %s (%d/%d, %s)".format(
+ this, execId, numAvailableOutputs, numPartitions, isAvailable))
+ }
+ }
+
+ def newAttemptId(): Int = {
+ val id = nextAttemptId
+ nextAttemptId += 1
+ return id
+ }
+
+ val name = callSite.getOrElse(rdd.origin)
+
+ override def toString = "Stage " + id
+
+ override def hashCode(): Int = id
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
new file mode 100644
index 0000000..72cb1c9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.scheduler.cluster.TaskInfo
+import scala.collection._
+import org.apache.spark.executor.TaskMetrics
+
+case class StageInfo(
+ val stage: Stage,
+ val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] = mutable.Buffer[(TaskInfo, TaskMetrics)]()
+) {
+ override def toString = stage.rdd.toString
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
new file mode 100644
index 0000000..598d917
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.serializer.SerializerInstance
+import java.io.{DataInputStream, DataOutputStream}
+import java.nio.ByteBuffer
+import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
+import org.apache.spark.util.ByteBufferInputStream
+import scala.collection.mutable.HashMap
+import org.apache.spark.executor.TaskMetrics
+
+/**
+ * A task to execute on a worker node.
+ */
+private[spark] abstract class Task[T](val stageId: Int) extends Serializable {
+ def run(attemptId: Long): T
+ def preferredLocations: Seq[TaskLocation] = Nil
+
+ var epoch: Long = -1 // Map output tracker epoch. Will be set by TaskScheduler.
+
+ var metrics: Option[TaskMetrics] = None
+
+}
+
+/**
+ * Handles transmission of tasks and their dependencies, because this can be slightly tricky. We
+ * need to send the list of JARs and files added to the SparkContext with each task to ensure that
+ * worker nodes find out about it, but we can't make it part of the Task because the user's code in
+ * the task might depend on one of the JARs. Thus we serialize each task as multiple objects, by
+ * first writing out its dependencies.
+ */
+private[spark] object Task {
+ /**
+ * Serialize a task and the current app dependencies (files and JARs added to the SparkContext)
+ */
+ def serializeWithDependencies(
+ task: Task[_],
+ currentFiles: HashMap[String, Long],
+ currentJars: HashMap[String, Long],
+ serializer: SerializerInstance)
+ : ByteBuffer = {
+
+ val out = new FastByteArrayOutputStream(4096)
+ val dataOut = new DataOutputStream(out)
+
+ // Write currentFiles
+ dataOut.writeInt(currentFiles.size)
+ for ((name, timestamp) <- currentFiles) {
+ dataOut.writeUTF(name)
+ dataOut.writeLong(timestamp)
+ }
+
+ // Write currentJars
+ dataOut.writeInt(currentJars.size)
+ for ((name, timestamp) <- currentJars) {
+ dataOut.writeUTF(name)
+ dataOut.writeLong(timestamp)
+ }
+
+ // Write the task itself and finish
+ dataOut.flush()
+ val taskBytes = serializer.serialize(task).array()
+ out.write(taskBytes)
+ out.trim()
+ ByteBuffer.wrap(out.array)
+ }
+
+ /**
+ * Deserialize the list of dependencies in a task serialized with serializeWithDependencies,
+ * and return the task itself as a serialized ByteBuffer. The caller can then update its
+ * ClassLoaders and deserialize the task.
+ *
+ * @return (taskFiles, taskJars, taskBytes)
+ */
+ def deserializeWithDependencies(serializedTask: ByteBuffer)
+ : (HashMap[String, Long], HashMap[String, Long], ByteBuffer) = {
+
+ val in = new ByteBufferInputStream(serializedTask)
+ val dataIn = new DataInputStream(in)
+
+ // Read task's files
+ val taskFiles = new HashMap[String, Long]()
+ val numFiles = dataIn.readInt()
+ for (i <- 0 until numFiles) {
+ taskFiles(dataIn.readUTF()) = dataIn.readLong()
+ }
+
+ // Read task's JARs
+ val taskJars = new HashMap[String, Long]()
+ val numJars = dataIn.readInt()
+ for (i <- 0 until numJars) {
+ taskJars(dataIn.readUTF()) = dataIn.readLong()
+ }
+
+ // Create a sub-buffer for the rest of the data, which is the serialized Task object
+ val subBuffer = serializedTask.slice() // ByteBufferInputStream will have read just up to task
+ (taskFiles, taskJars, subBuffer)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
new file mode 100644
index 0000000..67c9a67
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+/**
+ * A location where a task should run. This can either be a host or a (host, executorID) pair.
+ * In the latter case, we will prefer to launch the task on that executorID, but our next level
+ * of preference will be executors on the same host if this is not possible.
+ */
+private[spark]
+class TaskLocation private (val host: String, val executorId: Option[String]) extends Serializable {
+ override def toString: String = "TaskLocation(" + host + ", " + executorId + ")"
+}
+
+private[spark] object TaskLocation {
+ def apply(host: String, executorId: String) = new TaskLocation(host, Some(executorId))
+
+ def apply(host: String) = new TaskLocation(host, None)
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
new file mode 100644
index 0000000..776675d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io._
+
+import scala.collection.mutable.Map
+import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.{Utils, SparkEnv}
+import java.nio.ByteBuffer
+
+// Task result. Also contains updates to accumulator variables.
+// TODO: Use of distributed cache to return result is a hack to get around
+// what seems to be a bug with messages over 60KB in libprocess; fix it
+private[spark]
+class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any], var metrics: TaskMetrics)
+ extends Externalizable
+{
+ def this() = this(null.asInstanceOf[T], null, null)
+
+ override def writeExternal(out: ObjectOutput) {
+
+ val objectSer = SparkEnv.get.serializer.newInstance()
+ val bb = objectSer.serialize(value)
+
+ out.writeInt(bb.remaining())
+ Utils.writeByteBuffer(bb, out)
+
+ out.writeInt(accumUpdates.size)
+ for ((key, value) <- accumUpdates) {
+ out.writeLong(key)
+ out.writeObject(value)
+ }
+ out.writeObject(metrics)
+ }
+
+ override def readExternal(in: ObjectInput) {
+
+ val objectSer = SparkEnv.get.serializer.newInstance()
+
+ val blen = in.readInt()
+ val byteVal = new Array[Byte](blen)
+ in.readFully(byteVal)
+ value = objectSer.deserialize(ByteBuffer.wrap(byteVal))
+
+ val numUpdates = in.readInt
+ if (numUpdates == 0) {
+ accumUpdates = null
+ } else {
+ accumUpdates = Map()
+ for (i <- 0 until numUpdates) {
+ accumUpdates(in.readLong()) = in.readObject()
+ }
+ }
+ metrics = in.readObject().asInstanceOf[TaskMetrics]
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
new file mode 100644
index 0000000..63be8ba
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.scheduler.cluster.Pool
+import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+/**
+ * Low-level task scheduler interface, implemented by both ClusterScheduler and LocalScheduler.
+ * These schedulers get sets of tasks submitted to them from the DAGScheduler for each stage,
+ * and are responsible for sending the tasks to the cluster, running them, retrying if there
+ * are failures, and mitigating stragglers. They return events to the DAGScheduler through
+ * the TaskSchedulerListener interface.
+ */
+private[spark] trait TaskScheduler {
+
+ def rootPool: Pool
+
+ def schedulingMode: SchedulingMode
+
+ def start(): Unit
+
+ // Invoked after system has successfully initialized (typically in spark context).
+ // Yarn uses this to bootstrap allocation of resources based on preferred locations, wait for slave registerations, etc.
+ def postStartHook() { }
+
+ // Disconnect from the cluster.
+ def stop(): Unit
+
+ // Submit a sequence of tasks to run.
+ def submitTasks(taskSet: TaskSet): Unit
+
+ // Set a listener for upcalls. This is guaranteed to be set before submitTasks is called.
+ def setListener(listener: TaskSchedulerListener): Unit
+
+ // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
+ def defaultParallelism(): Int
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala
new file mode 100644
index 0000000..83be051
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerListener.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.scheduler.cluster.TaskInfo
+import scala.collection.mutable.Map
+
+import org.apache.spark.TaskEndReason
+import org.apache.spark.executor.TaskMetrics
+
+/**
+ * Interface for getting events back from the TaskScheduler.
+ */
+private[spark] trait TaskSchedulerListener {
+ // A task has started.
+ def taskStarted(task: Task[_], taskInfo: TaskInfo)
+
+ // A task has finished or failed.
+ def taskEnded(task: Task[_], reason: TaskEndReason, result: Any, accumUpdates: Map[Long, Any],
+ taskInfo: TaskInfo, taskMetrics: TaskMetrics): Unit
+
+ // A node was added to the cluster.
+ def executorGained(execId: String, host: String): Unit
+
+ // A node was lost from the cluster.
+ def executorLost(execId: String): Unit
+
+ // The TaskScheduler wants to abort an entire task set.
+ def taskSetFailed(taskSet: TaskSet, reason: String): Unit
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
new file mode 100644
index 0000000..c3ad325
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.Properties
+
+/**
+ * A set of tasks submitted together to the low-level TaskScheduler, usually representing
+ * missing partitions of a particular stage.
+ */
+private[spark] class TaskSet(
+ val tasks: Array[Task[_]],
+ val stageId: Int,
+ val attempt: Int,
+ val priority: Int,
+ val properties: Properties) {
+ val id: String = stageId + "." + attempt
+
+ override def toString: String = "TaskSet " + id
+}