You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/19 21:17:14 UTC
[5/6] [SPARK-1132] Persisting Web UI through refactoring the
SparkListener interface
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d83d034..77c558a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -32,7 +32,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId}
-import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
+import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
/**
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
@@ -54,87 +54,53 @@ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedH
*/
private[spark]
class DAGScheduler(
- taskSched: TaskScheduler,
+ taskScheduler: TaskScheduler,
+ listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv)
extends Logging {
- def this(taskSched: TaskScheduler) {
- this(taskSched, SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
- SparkEnv.get.blockManager.master, SparkEnv.get)
- }
- taskSched.setDAGScheduler(this)
+ import DAGScheduler._
- // Called by TaskScheduler to report task's starting.
- def taskStarted(task: Task[_], taskInfo: TaskInfo) {
- eventProcessActor ! BeginEvent(task, taskInfo)
- }
-
- // Called to report that a task has completed and results are being fetched remotely.
- def taskGettingResult(task: Task[_], taskInfo: TaskInfo) {
- eventProcessActor ! GettingResultEvent(task, taskInfo)
+ def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
+ this(
+ taskScheduler,
+ sc.listenerBus,
+ sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
+ sc.env.blockManager.master,
+ sc.env)
}
- // Called by TaskScheduler to report task completions or failures.
- def taskEnded(
- task: Task[_],
- reason: TaskEndReason,
- result: Any,
- accumUpdates: Map[Long, Any],
- taskInfo: TaskInfo,
- taskMetrics: TaskMetrics) {
- eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)
- }
-
- // Called by TaskScheduler when an executor fails.
- def executorLost(execId: String) {
- eventProcessActor ! ExecutorLost(execId)
- }
-
- // Called by TaskScheduler when a host is added
- def executorGained(execId: String, host: String) {
- eventProcessActor ! ExecutorGained(execId, host)
- }
-
- // Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or
- // cancellation of the job itself.
- def taskSetFailed(taskSet: TaskSet, reason: String) {
- eventProcessActor ! TaskSetFailed(taskSet, reason)
- }
-
- // The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
- // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
- // as more failure events come in
- val RESUBMIT_TIMEOUT = 200.milliseconds
-
- // The time, in millis, to wake up between polls of the completion queue in order to potentially
- // resubmit failed stages
- val POLL_TIMEOUT = 10L
-
- // Warns the user if a stage contains a task with size greater than this value (in KB)
- val TASK_SIZE_TO_WARN = 100
+ def this(sc: SparkContext) = this(sc, sc.taskScheduler)
private var eventProcessActor: ActorRef = _
private[scheduler] val nextJobId = new AtomicInteger(0)
-
- def numTotalJobs: Int = nextJobId.get()
-
+ private[scheduler] def numTotalJobs: Int = nextJobId.get()
private val nextStageId = new AtomicInteger(0)
private[scheduler] val jobIdToStageIds = new TimeStampedHashMap[Int, HashSet[Int]]
-
private[scheduler] val stageIdToJobIds = new TimeStampedHashMap[Int, HashSet[Int]]
-
private[scheduler] val stageIdToStage = new TimeStampedHashMap[Int, Stage]
-
private[scheduler] val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
-
+ private[scheduler] val stageIdToActiveJob = new HashMap[Int, ActiveJob]
+ private[scheduler] val resultStageToJob = new HashMap[Stage, ActiveJob]
private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
- // An async scheduler event bus. The bus should be stopped when DAGSCheduler is stopped.
- private[spark] val listenerBus = new SparkListenerBus
+ // Stages we need to run whose parents aren't done
+ private[scheduler] val waitingStages = new HashSet[Stage]
+
+ // Stages we are running right now
+ private[scheduler] val runningStages = new HashSet[Stage]
+
+ // Stages that must be resubmitted due to fetch failures
+ private[scheduler] val failedStages = new HashSet[Stage]
+
+ // Missing tasks from each stage
+ private[scheduler] val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]]
+
+ private[scheduler] val activeJobs = new HashSet[ActiveJob]
// Contains the locations that each RDD's partitions are cached on
private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
@@ -145,22 +111,12 @@ class DAGScheduler(
//
// TODO: Garbage collect information about failure epochs when we know there are no more
// stray messages to detect.
- val failedEpoch = new HashMap[String, Long]
+ private val failedEpoch = new HashMap[String, Long]
- // stage id to the active job
- val idToActiveJob = new HashMap[Int, ActiveJob]
+ private val metadataCleaner =
+ new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup, env.conf)
- val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done
- val running = new HashSet[Stage] // Stages we are running right now
- val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures
- // Missing tasks from each stage
- val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]]
-
- val activeJobs = new HashSet[ActiveJob]
- val resultStageToJob = new HashMap[Stage, ActiveJob]
-
- val metadataCleaner = new MetadataCleaner(
- MetadataCleanerType.DAG_SCHEDULER, this.cleanup, env.conf)
+ taskScheduler.setDAGScheduler(this)
/**
* Starts the event processing actor. The actor has two responsibilities:
@@ -196,13 +152,46 @@ class DAGScheduler(
}))
}
- def addSparkListener(listener: SparkListener) {
- listenerBus.addListener(listener)
+ // Called by TaskScheduler to report task's starting.
+ def taskStarted(task: Task[_], taskInfo: TaskInfo) {
+ eventProcessActor ! BeginEvent(task, taskInfo)
+ }
+
+ // Called to report that a task has completed and results are being fetched remotely.
+ def taskGettingResult(task: Task[_], taskInfo: TaskInfo) {
+ eventProcessActor ! GettingResultEvent(task, taskInfo)
+ }
+
+ // Called by TaskScheduler to report task completions or failures.
+ def taskEnded(
+ task: Task[_],
+ reason: TaskEndReason,
+ result: Any,
+ accumUpdates: Map[Long, Any],
+ taskInfo: TaskInfo,
+ taskMetrics: TaskMetrics) {
+ eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)
+ }
+
+ // Called by TaskScheduler when an executor fails.
+ def executorLost(execId: String) {
+ eventProcessActor ! ExecutorLost(execId)
+ }
+
+ // Called by TaskScheduler when a host is added
+ def executorAdded(execId: String, host: String) {
+ eventProcessActor ! ExecutorAdded(execId, host)
+ }
+
+ // Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or
+ // cancellation of the job itself.
+ def taskSetFailed(taskSet: TaskSet, reason: String) {
+ eventProcessActor ! TaskSetFailed(taskSet, reason)
}
private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
if (!cacheLocs.contains(rdd.id)) {
- val blockIds = rdd.partitions.indices.map(index=> RDDBlockId(rdd.id, index)).toArray[BlockId]
+ val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster)
cacheLocs(rdd.id) = blockIds.map { id =>
locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId))
@@ -250,7 +239,7 @@ class DAGScheduler(
new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
- stageToInfos(stage) = new StageInfo(stage)
+ stageToInfos(stage) = StageInfo.fromStage(stage)
stage
}
@@ -376,9 +365,9 @@ class DAGScheduler(
def removeStage(stageId: Int) {
// data structures based on Stage
for (stage <- stageIdToStage.get(stageId)) {
- if (running.contains(stage)) {
+ if (runningStages.contains(stage)) {
logDebug("Removing running stage %d".format(stageId))
- running -= stage
+ runningStages -= stage
}
stageToInfos -= stage
for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) {
@@ -388,13 +377,13 @@ class DAGScheduler(
logDebug("Removing pending status for stage %d".format(stageId))
}
pendingTasks -= stage
- if (waiting.contains(stage)) {
+ if (waitingStages.contains(stage)) {
logDebug("Removing stage %d from waiting set.".format(stageId))
- waiting -= stage
+ waitingStages -= stage
}
- if (failed.contains(stage)) {
+ if (failedStages.contains(stage)) {
logDebug("Removing stage %d from failed set.".format(stageId))
- failed -= stage
+ failedStages -= stage
}
}
// data structures based on StageId
@@ -544,13 +533,14 @@ class DAGScheduler(
logInfo("Missing parents: " + getMissingParentStages(finalStage))
if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
// Compute very short actions like first() or take() with no parent stages locally.
- listenerBus.post(SparkListenerJobStart(job, Array(), properties))
+ listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
runLocally(job)
} else {
- idToActiveJob(jobId) = job
+ stageIdToActiveJob(jobId) = job
activeJobs += job
resultStageToJob(finalStage) = job
- listenerBus.post(SparkListenerJobStart(job, jobIdToStageIds(jobId).toArray, properties))
+ listenerBus.post(
+ SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, properties))
submitStage(finalStage)
}
@@ -563,23 +553,23 @@ class DAGScheduler(
val activeInGroup = activeJobs.filter(activeJob =>
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
val jobIds = activeInGroup.map(_.jobId)
- jobIds.foreach { handleJobCancellation }
+ jobIds.foreach(handleJobCancellation)
case AllJobsCancelled =>
// Cancel all running jobs.
- running.map(_.jobId).foreach { handleJobCancellation }
+ runningStages.map(_.jobId).foreach(handleJobCancellation)
activeJobs.clear() // These should already be empty by this point,
- idToActiveJob.clear() // but just in case we lost track of some jobs...
+ stageIdToActiveJob.clear() // but just in case we lost track of some jobs...
- case ExecutorGained(execId, host) =>
- handleExecutorGained(execId, host)
+ case ExecutorAdded(execId, host) =>
+ handleExecutorAdded(execId, host)
case ExecutorLost(execId) =>
handleExecutorLost(execId)
case BeginEvent(task, taskInfo) =>
for (
- job <- idToActiveJob.get(task.stageId);
+ job <- stageIdToActiveJob.get(task.stageId);
stage <- stageIdToStage.get(task.stageId);
stageInfo <- stageToInfos.get(stage)
) {
@@ -591,20 +581,22 @@ class DAGScheduler(
task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, TASK_SIZE_TO_WARN))
}
}
- listenerBus.post(SparkListenerTaskStart(task, taskInfo))
+ listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
case GettingResultEvent(task, taskInfo) =>
- listenerBus.post(SparkListenerTaskGettingResult(task, taskInfo))
+ listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
- listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics))
+ val stageId = task.stageId
+ val taskType = Utils.getFormattedClassName(task)
+ listenerBus.post(SparkListenerTaskEnd(stageId, taskType, reason, taskInfo, taskMetrics))
handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason) =>
stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) }
case ResubmitFailedStages =>
- if (failed.size > 0) {
+ if (failedStages.size > 0) {
// Failed stages may be removed by job cancellation, so failed might be empty even if
// the ResubmitFailedStages event has been scheduled.
resubmitFailedStages()
@@ -615,7 +607,7 @@ class DAGScheduler(
for (job <- activeJobs) {
val error = new SparkException("Job cancelled because SparkContext was shut down")
job.listener.jobFailed(error)
- listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, None)))
+ listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, -1)))
}
return true
}
@@ -629,9 +621,9 @@ class DAGScheduler(
private[scheduler] def resubmitFailedStages() {
logInfo("Resubmitting failed stages")
clearCacheLocs()
- val failed2 = failed.toArray
- failed.clear()
- for (stage <- failed2.sortBy(_.jobId)) {
+ val failedStagesCopy = failedStages.toArray
+ failedStages.clear()
+ for (stage <- failedStagesCopy.sortBy(_.jobId)) {
submitStage(stage)
}
}
@@ -644,12 +636,12 @@ class DAGScheduler(
// TODO: We might want to run this less often, when we are sure that something has become
// runnable that wasn't before.
logTrace("Checking for newly runnable parent stages")
- logTrace("running: " + running)
- logTrace("waiting: " + waiting)
- logTrace("failed: " + failed)
- val waiting2 = waiting.toArray
- waiting.clear()
- for (stage <- waiting2.sortBy(_.jobId)) {
+ logTrace("running: " + runningStages)
+ logTrace("waiting: " + waitingStages)
+ logTrace("failed: " + failedStages)
+ val waitingStagesCopy = waitingStages.toArray
+ waitingStages.clear()
+ for (stage <- waitingStagesCopy.sortBy(_.jobId)) {
submitStage(stage)
}
}
@@ -685,7 +677,7 @@ class DAGScheduler(
}
} catch {
case e: Exception =>
- jobResult = JobFailed(e, Some(job.finalStage))
+ jobResult = JobFailed(e, job.finalStage.id)
job.listener.jobFailed(e)
} finally {
val s = job.finalStage
@@ -693,7 +685,7 @@ class DAGScheduler(
stageIdToStage -= s.id // but that won't get cleaned up via the normal paths through
stageToInfos -= s // completion events or stage abort
jobIdToStageIds -= job.jobId
- listenerBus.post(SparkListenerJobEnd(job, jobResult))
+ listenerBus.post(SparkListenerJobEnd(job.jobId, jobResult))
}
}
@@ -705,7 +697,7 @@ class DAGScheduler(
private def activeJobForStage(stage: Stage): Option[Int] = {
if (stageIdToJobIds.contains(stage.id)) {
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stage.id).toArray.sorted
- jobsThatUseStage.find(idToActiveJob.contains(_))
+ jobsThatUseStage.find(stageIdToActiveJob.contains)
} else {
None
}
@@ -716,18 +708,18 @@ class DAGScheduler(
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
- if (!waiting(stage) && !running(stage) && !failed(stage)) {
+ if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing == Nil) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
- running += stage
+ runningStages += stage
} else {
for (parent <- missing) {
submitStage(parent)
}
- waiting += stage
+ waitingStages += stage
}
}
} else {
@@ -758,8 +750,8 @@ class DAGScheduler(
}
}
- val properties = if (idToActiveJob.contains(jobId)) {
- idToActiveJob(stage.jobId).properties
+ val properties = if (stageIdToActiveJob.contains(jobId)) {
+ stageIdToActiveJob(stage.jobId).properties
} else {
//this stage will be assigned to "default" pool
null
@@ -779,20 +771,20 @@ class DAGScheduler(
} catch {
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString)
- running -= stage
+ runningStages -= stage
return
}
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
myPending ++= tasks
logDebug("New pending tasks: " + myPending)
- taskSched.submitTasks(
+ taskScheduler.submitTasks(
new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
} else {
logDebug("Stage " + stage + " is actually done; %b %d %d".format(
stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
- running -= stage
+ runningStages -= stage
}
}
@@ -817,7 +809,7 @@ class DAGScheduler(
logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
stageToInfos(stage).completionTime = Some(System.currentTimeMillis())
listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
- running -= stage
+ runningStages -= stage
}
event.reason match {
case Success =>
@@ -826,7 +818,6 @@ class DAGScheduler(
Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
}
pendingTasks(stage) -= task
- stageToInfos(stage).taskInfos += event.taskInfo -> event.taskMetrics
task match {
case rt: ResultTask[_, _] =>
resultStageToJob.get(stage) match {
@@ -836,12 +827,12 @@ class DAGScheduler(
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
- idToActiveJob -= stage.jobId
+ stageIdToActiveJob -= stage.jobId
activeJobs -= job
resultStageToJob -= stage
markStageAsFinished(stage)
jobIdToStageIdsRemove(job.jobId)
- listenerBus.post(SparkListenerJobEnd(job, JobSucceeded))
+ listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
}
job.listener.taskSucceeded(rt.outputId, event.result)
}
@@ -858,12 +849,12 @@ class DAGScheduler(
} else {
stage.addOutputLoc(smt.partitionId, status)
}
- if (running.contains(stage) && pendingTasks(stage).isEmpty) {
+ if (runningStages.contains(stage) && pendingTasks(stage).isEmpty) {
markStageAsFinished(stage)
logInfo("looking for newly runnable stages")
- logInfo("running: " + running)
- logInfo("waiting: " + waiting)
- logInfo("failed: " + failed)
+ logInfo("running: " + runningStages)
+ logInfo("waiting: " + waitingStages)
+ logInfo("failed: " + failedStages)
if (stage.shuffleDep.isDefined) {
// We supply true to increment the epoch number here in case this is a
// recomputation of the map outputs. In that case, some nodes may have cached
@@ -886,14 +877,14 @@ class DAGScheduler(
submitStage(stage)
} else {
val newlyRunnable = new ArrayBuffer[Stage]
- for (stage <- waiting) {
+ for (stage <- waitingStages) {
logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage))
}
- for (stage <- waiting if getMissingParentStages(stage) == Nil) {
+ for (stage <- waitingStages if getMissingParentStages(stage) == Nil) {
newlyRunnable += stage
}
- waiting --= newlyRunnable
- running ++= newlyRunnable
+ waitingStages --= newlyRunnable
+ runningStages ++= newlyRunnable
for {
stage <- newlyRunnable.sortBy(_.id)
jobId <- activeJobForStage(stage)
@@ -912,7 +903,7 @@ class DAGScheduler(
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
// Mark the stage that the reducer was in as unrunnable
val failedStage = stageIdToStage(task.stageId)
- running -= failedStage
+ runningStages -= failedStage
// TODO: Cancel running tasks in the stage
logInfo("Marking " + failedStage + " (" + failedStage.name +
") for resubmision due to a fetch failure")
@@ -924,7 +915,7 @@ class DAGScheduler(
}
logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
"); marking it for resubmission")
- if (failed.isEmpty && eventProcessActor != null) {
+ if (failedStages.isEmpty && eventProcessActor != null) {
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
// in that case the event will already have been scheduled. eventProcessActor may be
// null during unit tests.
@@ -932,8 +923,8 @@ class DAGScheduler(
env.actorSystem.scheduler.scheduleOnce(
RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
}
- failed += failedStage
- failed += mapStage
+ failedStages += failedStage
+ failedStages += mapStage
// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, Some(task.epoch))
@@ -980,10 +971,10 @@ class DAGScheduler(
}
}
- private def handleExecutorGained(execId: String, host: String) {
+ private def handleExecutorAdded(execId: String, host: String) {
// remove from failedEpoch(execId) ?
if (failedEpoch.contains(execId)) {
- logInfo("Host gained which was in lost list earlier: " + host)
+ logInfo("Host added was in lost list earlier: " + host)
failedEpoch -= execId
}
}
@@ -993,14 +984,14 @@ class DAGScheduler(
logDebug("Trying to cancel unregistered job " + jobId)
} else {
val independentStages = removeJobAndIndependentStages(jobId)
- independentStages.foreach { taskSched.cancelTasks }
+ independentStages.foreach(taskScheduler.cancelTasks)
val error = new SparkException("Job %d cancelled".format(jobId))
- val job = idToActiveJob(jobId)
+ val job = stageIdToActiveJob(jobId)
job.listener.jobFailed(error)
jobIdToStageIds -= jobId
activeJobs -= job
- idToActiveJob -= jobId
- listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(job.finalStage))))
+ stageIdToActiveJob -= jobId
+ listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, job.finalStage.id)))
}
}
@@ -1020,10 +1011,10 @@ class DAGScheduler(
val error = new SparkException("Job aborted: " + reason)
job.listener.jobFailed(error)
jobIdToStageIdsRemove(job.jobId)
- idToActiveJob -= resultStage.jobId
+ stageIdToActiveJob -= resultStage.jobId
activeJobs -= job
resultStageToJob -= resultStage
- listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))
+ listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, failedStage.id)))
}
if (dependentStages.isEmpty) {
logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")
@@ -1102,11 +1093,11 @@ class DAGScheduler(
"stageToInfos" -> stageToInfos,
"jobIdToStageIds" -> jobIdToStageIds,
"stageIdToJobIds" -> stageIdToJobIds).
- foreach { case(s, t) => {
- val sizeBefore = t.size
- t.clearOldValues(cleanupTime)
- logInfo("%s %d --> %d".format(s, sizeBefore, t.size))
- }}
+ foreach { case (s, t) =>
+ val sizeBefore = t.size
+ t.clearOldValues(cleanupTime)
+ logInfo("%s %d --> %d".format(s, sizeBefore, t.size))
+ }
}
def stop() {
@@ -1114,7 +1105,20 @@ class DAGScheduler(
eventProcessActor ! StopDAGScheduler
}
metadataCleaner.cancel()
- taskSched.stop()
- listenerBus.stop()
+ taskScheduler.stop()
}
}
+
+private[spark] object DAGScheduler {
+ // The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
+ // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
+ // as more failure events come in
+ val RESUBMIT_TIMEOUT = 200.milliseconds
+
+ // The time, in millis, to wake up between polls of the completion queue in order to potentially
+ // resubmit failed stages
+ val POLL_TIMEOUT = 10L
+
+ // Warns the user if a stage contains a task with size greater than this value (in KB)
+ val TASK_SIZE_TO_WARN = 100
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 39cd98e..04c53d4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -65,7 +65,7 @@ private[scheduler] case class CompletionEvent(
taskMetrics: TaskMetrics)
extends DAGSchedulerEvent
-private[scheduler] case class ExecutorGained(execId: String, host: String) extends DAGSchedulerEvent
+private[scheduler] case class ExecutorAdded(execId: String, host: String) extends DAGSchedulerEvent
private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index b52fe24..5878e73 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -28,15 +28,15 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: Spar
val sourceName = "%s.DAGScheduler".format(sc.appName)
metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] {
- override def getValue: Int = dagScheduler.failed.size
+ override def getValue: Int = dagScheduler.failedStages.size
})
metricRegistry.register(MetricRegistry.name("stage", "runningStages"), new Gauge[Int] {
- override def getValue: Int = dagScheduler.running.size
+ override def getValue: Int = dagScheduler.runningStages.size
})
metricRegistry.register(MetricRegistry.name("stage", "waitingStages"), new Gauge[Int] {
- override def getValue: Int = dagScheduler.waiting.size
+ override def getValue: Int = dagScheduler.waitingStages.size
})
metricRegistry.register(MetricRegistry.name("job", "allJobs"), new Gauge[Int] {
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
new file mode 100644
index 0000000..217f882
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.{JsonProtocol, FileLogger}
+
+/**
+ * A SparkListener that logs events to persistent storage.
+ *
+ * Event logging is specified by the following configurable parameters:
+ * spark.eventLog.enabled - Whether event logging is enabled.
+ * spark.eventLog.compress - Whether to compress logged events
+ * spark.eventLog.overwrite - Whether to overwrite any existing files.
+ * spark.eventLog.dir - Path to the directory in which events are logged.
+ * spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
+ */
+private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
+ extends SparkListener with Logging {
+
+ private val shouldCompress = conf.getBoolean("spark.eventLog.compress", false)
+ private val shouldOverwrite = conf.getBoolean("spark.eventLog.overwrite", false)
+ private val outputBufferSize = conf.getInt("spark.eventLog.buffer.kb", 100) * 1024
+ private val logBaseDir = conf.get("spark.eventLog.dir", "/tmp/spark-events").stripSuffix("/")
+ private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
+ val logDir = logBaseDir + "/" + name
+
+ private val logger =
+ new FileLogger(logDir, conf, outputBufferSize, shouldCompress, shouldOverwrite)
+
+ // Information needed to replay the events logged by this listener later
+ val info = {
+ val compressionCodec = if (shouldCompress) {
+ Some(conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC))
+ } else None
+ EventLoggingInfo(logDir, compressionCodec)
+ }
+
+ logInfo("Logging events to %s".format(logDir))
+
+ /** Log the event as JSON */
+ private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
+ val eventJson = compact(render(JsonProtocol.sparkEventToJson(event)))
+ logger.logLine(eventJson)
+ if (flushLogger) {
+ logger.flush()
+ }
+ }
+
+ // Events that do not trigger a flush
+ override def onStageSubmitted(event: SparkListenerStageSubmitted) =
+ logEvent(event)
+ override def onTaskStart(event: SparkListenerTaskStart) =
+ logEvent(event)
+ override def onTaskGettingResult(event: SparkListenerTaskGettingResult) =
+ logEvent(event)
+ override def onTaskEnd(event: SparkListenerTaskEnd) =
+ logEvent(event)
+ override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate) =
+ logEvent(event)
+
+ // Events that trigger a flush
+ override def onStageCompleted(event: SparkListenerStageCompleted) =
+ logEvent(event, flushLogger = true)
+ override def onJobStart(event: SparkListenerJobStart) =
+ logEvent(event, flushLogger = true)
+ override def onJobEnd(event: SparkListenerJobEnd) =
+ logEvent(event, flushLogger = true)
+ override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded) =
+ logEvent(event, flushLogger = true)
+ override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved) =
+ logEvent(event, flushLogger = true)
+ override def onUnpersistRDD(event: SparkListenerUnpersistRDD) =
+ logEvent(event, flushLogger = true)
+
+ def stop() = logger.stop()
+}
+
+// If compression is not enabled, compressionCodec is None
+private[spark] case class EventLoggingInfo(logDir: String, compressionCodec: Option[String])
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 01cbcc3..b3a67d7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -22,24 +22,25 @@ import java.text.SimpleDateFormat
import java.util.{Date, Properties}
import java.util.concurrent.LinkedBlockingQueue
-import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
+import scala.collection.mutable.HashMap
import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.StorageLevel
/**
* A logger class to record runtime information for jobs in Spark. This class outputs one log file
- * for each Spark job, containing RDD graph, tasks start/stop, shuffle information.
- * JobLogger is a subclass of SparkListener, use addSparkListener to add JobLogger to a SparkContext
- * after the SparkContext is created.
- * Note that each JobLogger only works for one SparkContext
- * @param logDirName The base directory for the log files.
+ * for each Spark job, containing tasks start/stop and shuffle information. JobLogger is a subclass
+ * of SparkListener, use addSparkListener to add JobLogger to a SparkContext after the SparkContext
+ * is created. Note that each JobLogger only works for one SparkContext
+ *
+ * NOTE: The functionality of this class is heavily stripped down to accommodate for a general
+ * refactor of the SparkListener interface. In its place, the EventLoggingListener is introduced
+ * to log application information as SparkListenerEvents. To enable this functionality, set
+ * spark.eventLog.enabled to true.
*/
-class JobLogger(val user: String, val logDirName: String)
- extends SparkListener with Logging {
+@deprecated("Log application information by setting spark.eventLog.enabled.", "1.0.0")
+class JobLogger(val user: String, val logDirName: String) extends SparkListener with Logging {
def this() = this(System.getProperty("user.name", "<unknown>"),
String.valueOf(System.currentTimeMillis()))
@@ -51,19 +52,19 @@ class JobLogger(val user: String, val logDirName: String)
"/tmp/spark-%s".format(user)
}
- private val jobIDToPrintWriter = new HashMap[Int, PrintWriter]
- private val stageIDToJobID = new HashMap[Int, Int]
- private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]]
+ private val jobIdToPrintWriter = new HashMap[Int, PrintWriter]
+ private val stageIdToJobId = new HashMap[Int, Int]
+ private val jobIdToStageIds = new HashMap[Int, Seq[Int]]
private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
- private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents]
+ private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]
createLogDir()
// The following 5 functions are used only in testing.
private[scheduler] def getLogDir = logDir
- private[scheduler] def getJobIDtoPrintWriter = jobIDToPrintWriter
- private[scheduler] def getStageIDToJobID = stageIDToJobID
- private[scheduler] def getJobIDToStages = jobIDToStages
+ private[scheduler] def getJobIdToPrintWriter = jobIdToPrintWriter
+ private[scheduler] def getStageIdToJobId = stageIdToJobId
+ private[scheduler] def getJobIdToStageIds = jobIdToStageIds
private[scheduler] def getEventQueue = eventQueue
/** Create a folder for log files, the folder's name is the creation time of jobLogger */
@@ -80,187 +81,78 @@ class JobLogger(val user: String, val logDirName: String)
/**
* Create a log file for one job
- * @param jobID ID of the job
+ * @param jobId ID of the job
* @throws FileNotFoundException Fail to create log file
*/
- protected def createLogWriter(jobID: Int) {
+ protected def createLogWriter(jobId: Int) {
try {
- val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
- jobIDToPrintWriter += (jobID -> fileWriter)
+ val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobId)
+ jobIdToPrintWriter += (jobId -> fileWriter)
} catch {
case e: FileNotFoundException => e.printStackTrace()
}
}
/**
- * Close log file, and clean the stage relationship in stageIDToJobID
- * @param jobID ID of the job
+ * Close log file, and clean the stage relationship in stageIdToJobId
+ * @param jobId ID of the job
*/
- protected def closeLogWriter(jobID: Int) {
- jobIDToPrintWriter.get(jobID).foreach { fileWriter =>
+ protected def closeLogWriter(jobId: Int) {
+ jobIdToPrintWriter.get(jobId).foreach { fileWriter =>
fileWriter.close()
- jobIDToStages.get(jobID).foreach(_.foreach{ stage =>
- stageIDToJobID -= stage.id
+ jobIdToStageIds.get(jobId).foreach(_.foreach { stageId =>
+ stageIdToJobId -= stageId
})
- jobIDToPrintWriter -= jobID
- jobIDToStages -= jobID
+ jobIdToPrintWriter -= jobId
+ jobIdToStageIds -= jobId
}
}
/**
+ * Build up the maps that represent stage-job relationships
+ * @param jobId ID of the job
+ * @param stageIds IDs of the associated stages
+ */
+ protected def buildJobStageDependencies(jobId: Int, stageIds: Seq[Int]) = {
+ jobIdToStageIds(jobId) = stageIds
+ stageIds.foreach { stageId => stageIdToJobId(stageId) = jobId }
+ }
+
+ /**
* Write info into log file
- * @param jobID ID of the job
+ * @param jobId ID of the job
* @param info Info to be recorded
* @param withTime Controls whether to record time stamp before the info, default is true
*/
- protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
+ protected def jobLogInfo(jobId: Int, info: String, withTime: Boolean = true) {
var writeInfo = info
if (withTime) {
val date = new Date(System.currentTimeMillis())
writeInfo = DATE_FORMAT.format(date) + ": " + info
}
- jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
+ jobIdToPrintWriter.get(jobId).foreach(_.println(writeInfo))
}
/**
* Write info into log file
- * @param stageID ID of the stage
+ * @param stageId ID of the stage
* @param info Info to be recorded
* @param withTime Controls whether to record time stamp before the info, default is true
*/
- protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) {
- stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
- }
-
- /**
- * Build stage dependency for a job
- * @param jobID ID of the job
- * @param stage Root stage of the job
- */
- protected def buildJobDep(jobID: Int, stage: Stage) {
- if (stage.jobId == jobID) {
- jobIDToStages.get(jobID) match {
- case Some(stageList) => stageList += stage
- case None => val stageList = new ListBuffer[Stage]
- stageList += stage
- jobIDToStages += (jobID -> stageList)
- }
- stageIDToJobID += (stage.id -> jobID)
- stage.parents.foreach(buildJobDep(jobID, _))
- }
- }
-
- /**
- * Record stage dependency and RDD dependency for a stage
- * @param jobID Job ID of the stage
- */
- protected def recordStageDep(jobID: Int) {
- def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
- var rddList = new ListBuffer[RDD[_]]
- rddList += rdd
- rdd.dependencies.foreach {
- case shufDep: ShuffleDependency[_, _] =>
- case dep: Dependency[_] => rddList ++= getRddsInStage(dep.rdd)
- }
- rddList
- }
- jobIDToStages.get(jobID).foreach {_.foreach { stage =>
- var depRddDesc: String = ""
- getRddsInStage(stage.rdd).foreach { rdd =>
- depRddDesc += rdd.id + ","
- }
- var depStageDesc: String = ""
- stage.parents.foreach { stage =>
- depStageDesc += "(" + stage.id + "," + stage.shuffleDep.get.shuffleId + ")"
- }
- jobLogInfo(jobID, "STAGE_ID=" + stage.id + " RDD_DEP=(" +
- depRddDesc.substring(0, depRddDesc.length - 1) + ")" +
- " STAGE_DEP=" + depStageDesc, false)
- }
- }
- }
-
- /**
- * Generate indents and convert to String
- * @param indent Number of indents
- * @return string of indents
- */
- protected def indentString(indent: Int): String = {
- val sb = new StringBuilder()
- for (i <- 1 to indent) {
- sb.append(" ")
- }
- sb.toString()
- }
-
- /**
- * Get RDD's name
- * @param rdd Input RDD
- * @return String of RDD's name
- */
- protected def getRddName(rdd: RDD[_]): String = {
- var rddName = rdd.getClass.getSimpleName
- if (rdd.name != null) {
- rddName = rdd.name
- }
- rddName
- }
-
- /**
- * Record RDD dependency graph in a stage
- * @param jobID Job ID of the stage
- * @param rdd Root RDD of the stage
- * @param indent Indent number before info
- */
- protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
- val cacheStr = if (rdd.getStorageLevel != StorageLevel.NONE) "CACHED" else "NONE"
- val rddInfo =
- s"RDD_ID=$rdd.id ${getRddName(rdd)} $cacheStr " +
- s"${rdd.getCreationSite} ${rdd.creationSiteInfo.firstUserClass}"
- jobLogInfo(jobID, indentString(indent) + rddInfo, false)
- rdd.dependencies.foreach {
- case shufDep: ShuffleDependency[_, _] =>
- val depInfo = "SHUFFLE_ID=" + shufDep.shuffleId
- jobLogInfo(jobID, indentString(indent + 1) + depInfo, false)
- case dep: Dependency[_] => recordRddInStageGraph(jobID, dep.rdd, indent + 1)
- }
- }
-
- /**
- * Record stage dependency graph of a job
- * @param jobID Job ID of the stage
- * @param stage Root stage of the job
- * @param indent Indent number before info, default is 0
- */
- protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0)
- {
- val stageInfo = if (stage.isShuffleMap) {
- "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId
- } else {
- "STAGE_ID=" + stage.id + " RESULT_STAGE"
- }
- if (stage.jobId == jobID) {
- jobLogInfo(jobID, indentString(indent) + stageInfo, false)
- if (!idSet.contains(stage.id)) {
- idSet += stage.id
- recordRddInStageGraph(jobID, stage.rdd, indent)
- stage.parents.foreach(recordStageDepGraph(jobID, _, idSet, indent + 2))
- }
- } else {
- jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false)
- }
+ protected def stageLogInfo(stageId: Int, info: String, withTime: Boolean = true) {
+ stageIdToJobId.get(stageId).foreach(jobId => jobLogInfo(jobId, info, withTime))
}
/**
* Record task metrics into job log files, including execution info and shuffle metrics
- * @param stageID Stage ID of the task
+ * @param stageId Stage ID of the task
* @param status Status info of the task
* @param taskInfo Task description info
* @param taskMetrics Task running metrics
*/
- protected def recordTaskMetrics(stageID: Int, status: String,
+ protected def recordTaskMetrics(stageId: Int, status: String,
taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
- val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID +
+ val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageId +
" START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime +
" EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskMetrics.hostname
val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
@@ -278,7 +170,7 @@ class JobLogger(val user: String, val logDirName: String)
case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
case None => ""
}
- stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
+ stageLogInfo(stageId, status + info + executorRunTime + readMetrics + writeMetrics)
}
/**
@@ -286,8 +178,9 @@ class JobLogger(val user: String, val logDirName: String)
* @param stageSubmitted Stage submitted event
*/
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
- stageLogInfo(stageSubmitted.stage.stageId,"STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
- stageSubmitted.stage.stageId, stageSubmitted.stage.numTasks))
+ val stageInfo = stageSubmitted.stageInfo
+ stageLogInfo(stageInfo.stageId, "STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
+ stageInfo.stageId, stageInfo.numTasks))
}
/**
@@ -295,36 +188,30 @@ class JobLogger(val user: String, val logDirName: String)
* @param stageCompleted Stage completed event
*/
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
- stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format(
- stageCompleted.stage.stageId))
+ val stageId = stageCompleted.stageInfo.stageId
+ stageLogInfo(stageId, "STAGE_ID=%d STATUS=COMPLETED".format(stageId))
}
- override def onTaskStart(taskStart: SparkListenerTaskStart) { }
-
/**
* When task ends, record task completion status and metrics
* @param taskEnd Task end event
*/
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
- val task = taskEnd.task
val taskInfo = taskEnd.taskInfo
- var taskStatus = ""
- task match {
- case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
- case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
- }
+ var taskStatus = "TASK_TYPE=%s".format(taskEnd.taskType)
+ val taskMetrics = if (taskEnd.taskMetrics != null) taskEnd.taskMetrics else TaskMetrics.empty()
taskEnd.reason match {
case Success => taskStatus += " STATUS=SUCCESS"
- recordTaskMetrics(task.stageId, taskStatus, taskInfo, taskEnd.taskMetrics)
+ recordTaskMetrics(taskEnd.stageId, taskStatus, taskInfo, taskMetrics)
case Resubmitted =>
taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId +
- " STAGE_ID=" + task.stageId
- stageLogInfo(task.stageId, taskStatus)
+ " STAGE_ID=" + taskEnd.stageId
+ stageLogInfo(taskEnd.stageId, taskStatus)
case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" +
- task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
+ taskEnd.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
mapId + " REDUCE_ID=" + reduceId
- stageLogInfo(task.stageId, taskStatus)
+ stageLogInfo(taskEnd.stageId, taskStatus)
case _ =>
}
}
@@ -334,8 +221,8 @@ class JobLogger(val user: String, val logDirName: String)
* @param jobEnd Job end event
*/
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
- val job = jobEnd.job
- var info = "JOB_ID=" + job.jobId
+ val jobId = jobEnd.jobId
+ var info = "JOB_ID=" + jobId
jobEnd.jobResult match {
case JobSucceeded => info += " STATUS=SUCCESS"
case JobFailed(exception, _) =>
@@ -343,19 +230,19 @@ class JobLogger(val user: String, val logDirName: String)
exception.getMessage.split("\\s+").foreach(info += _ + "_")
case _ =>
}
- jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase)
- closeLogWriter(job.jobId)
+ jobLogInfo(jobId, info.substring(0, info.length - 1).toUpperCase)
+ closeLogWriter(jobId)
}
/**
* Record job properties into job log file
- * @param jobID ID of the job
+ * @param jobId ID of the job
* @param properties Properties of the job
*/
- protected def recordJobProperties(jobID: Int, properties: Properties) {
- if(properties != null) {
+ protected def recordJobProperties(jobId: Int, properties: Properties) {
+ if (properties != null) {
val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
- jobLogInfo(jobID, description, false)
+ jobLogInfo(jobId, description, false)
}
}
@@ -364,14 +251,11 @@ class JobLogger(val user: String, val logDirName: String)
* @param jobStart Job start event
*/
override def onJobStart(jobStart: SparkListenerJobStart) {
- val job = jobStart.job
+ val jobId = jobStart.jobId
val properties = jobStart.properties
- createLogWriter(job.jobId)
- recordJobProperties(job.jobId, properties)
- buildJobDep(job.jobId, job.finalStage)
- recordStageDep(job.jobId)
- recordStageDepGraph(job.jobId, job.finalStage, new HashSet[Int])
- jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED")
+ createLogWriter(jobId)
+ recordJobProperties(jobId, properties)
+ buildJobStageDependencies(jobId, jobStart.stageIds)
+ jobLogInfo(jobId, "JOB_ID=" + jobId + " STATUS=STARTED")
}
}
-
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
index d94f6ad..3cf4e30 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
@@ -23,5 +23,6 @@ package org.apache.spark.scheduler
private[spark] sealed trait JobResult
private[spark] case object JobSucceeded extends JobResult
-private[spark] case class JobFailed(exception: Exception, failedStage: Option[Stage])
- extends JobResult
+
+// A failed stage ID of -1 means there is not a particular stage that caused the failure
+private[spark] case class JobFailed(exception: Exception, failedStageId: Int) extends JobResult
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
index b026f86..8007b54 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
@@ -64,7 +64,7 @@ private[spark] class JobWaiter[T](
override def jobFailed(exception: Exception): Unit = synchronized {
_jobFinished = true
- jobResult = JobFailed(exception, None)
+ jobResult = JobFailed(exception, -1)
this.notifyAll()
}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
new file mode 100644
index 0000000..353a486
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import org.apache.spark.Logging
+
+/**
+ * Asynchronously passes SparkListenerEvents to registered SparkListeners.
+ *
+ * Until start() is called, all posted events are only buffered. Only after this listener bus
+ * has started will events be actually propagated to all attached listeners. This listener bus
+ * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
+ */
+private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
+
+ /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
+ * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
+ private val EVENT_QUEUE_CAPACITY = 10000
+ private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
+ private var queueFullErrorMessageLogged = false
+ private var started = false
+
+ /**
+ * Start sending events to attached listeners.
+ *
+ * This first sends out all buffered events posted before this listener bus has started, then
+ * listens for any additional events asynchronously while the listener bus is still running.
+ * This should only be called once.
+ */
+ def start() {
+ if (started) {
+ throw new IllegalStateException("Listener bus already started!")
+ }
+ started = true
+ new Thread("SparkListenerBus") {
+ setDaemon(true)
+ override def run() {
+ while (true) {
+ val event = eventQueue.take
+ if (event == SparkListenerShutdown) {
+ // Get out of the while loop and shutdown the daemon thread
+ return
+ }
+ postToAll(event)
+ }
+ }
+ }.start()
+ }
+
+ def post(event: SparkListenerEvent) {
+ val eventAdded = eventQueue.offer(event)
+ if (!eventAdded && !queueFullErrorMessageLogged) {
+ logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
+ "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
+ "rate at which tasks are being started by the scheduler.")
+ queueFullErrorMessageLogged = true
+ }
+ }
+
+ /**
+ * Waits until there are no more events in the queue, or until the specified time has elapsed.
+ * Used for testing only. Returns true if the queue has emptied and false is the specified time
+ * elapsed before the queue emptied.
+ */
+ def waitUntilEmpty(timeoutMillis: Int): Boolean = {
+ val finishTime = System.currentTimeMillis + timeoutMillis
+ while (!eventQueue.isEmpty) {
+ if (System.currentTimeMillis > finishTime) {
+ return false
+ }
+ /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
+ * add overhead in the general case. */
+ Thread.sleep(10)
+ }
+ true
+ }
+
+ def stop() {
+ if (!started) {
+ throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
+ }
+ post(SparkListenerShutdown)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 4bc13c2..187672c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -62,7 +62,7 @@ private[spark] class Pool(
override def addSchedulable(schedulable: Schedulable) {
schedulableQueue += schedulable
schedulableNameToSchedulable(schedulable.name) = schedulable
- schedulable.parent= this
+ schedulable.parent = this
}
override def removeSchedulable(schedulable: Schedulable) {
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
new file mode 100644
index 0000000..db76178
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.InputStream
+import java.net.URI
+
+import scala.io.Source
+
+import it.unimi.dsi.fastutil.io.FastBufferedInputStream
+import org.apache.hadoop.fs.{Path, FileSystem}
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.{JsonProtocol, Utils}
+
+/**
+ * An EventBus that replays logged events from persisted storage
+ */
+private[spark] class ReplayListenerBus(conf: SparkConf) extends SparkListenerBus with Logging {
+ private val compressed = conf.getBoolean("spark.eventLog.compress", false)
+
+ // Only used if compression is enabled
+ private lazy val compressionCodec = CompressionCodec.createCodec(conf)
+
+ /**
+ * Return a list of paths representing log files in the given directory.
+ */
+ private def getLogFilePaths(logDir: String, fileSystem: FileSystem): Array[Path] = {
+ val path = new Path(logDir)
+ if (!fileSystem.exists(path) || !fileSystem.getFileStatus(path).isDir) {
+ logWarning("Log path provided is not a valid directory: %s".format(logDir))
+ return Array[Path]()
+ }
+ val logStatus = fileSystem.listStatus(path)
+ if (logStatus == null || !logStatus.exists(!_.isDir)) {
+ logWarning("Log path provided contains no log files: %s".format(logDir))
+ return Array[Path]()
+ }
+ logStatus.filter(!_.isDir).map(_.getPath).sortBy(_.getName)
+ }
+
+ /**
+ * Replay each event in the order maintained in the given logs.
+ */
+ def replay(logDir: String): Boolean = {
+ val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
+ val logPaths = getLogFilePaths(logDir, fileSystem)
+ if (logPaths.length == 0) {
+ return false
+ }
+
+ logPaths.foreach { path =>
+ // Keep track of input streams at all levels to close them later
+ // This is necessary because an exception can occur in between stream initializations
+ var fileStream: Option[InputStream] = None
+ var bufferedStream: Option[InputStream] = None
+ var compressStream: Option[InputStream] = None
+ var currentLine = ""
+ try {
+ currentLine = "<not started>"
+ fileStream = Some(fileSystem.open(path))
+ bufferedStream = Some(new FastBufferedInputStream(fileStream.get))
+ compressStream =
+ if (compressed) {
+ Some(compressionCodec.compressedInputStream(bufferedStream.get))
+ } else bufferedStream
+
+ // Parse each line as an event and post it to all attached listeners
+ val lines = Source.fromInputStream(compressStream.get).getLines()
+ lines.foreach { line =>
+ currentLine = line
+ postToAll(JsonProtocol.sparkEventFromJson(parse(line)))
+ }
+ } catch {
+ case e: Exception =>
+ logError("Exception in parsing Spark event log %s".format(path), e)
+ logError("Malformed line: %s\n".format(currentLine))
+ } finally {
+ fileStream.foreach(_.close())
+ bufferedStream.foreach(_.close())
+ compressStream.foreach(_.close())
+ }
+ }
+ fileSystem.close()
+ true
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 9590c03..d4eb0ac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -19,33 +19,52 @@ package org.apache.spark.scheduler
import java.util.Properties
+import scala.collection.Map
+import scala.collection.mutable
+
import org.apache.spark.{Logging, TaskEndReason}
import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{Distribution, Utils}
-sealed trait SparkListenerEvents
+sealed trait SparkListenerEvent
+
+case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
+ extends SparkListenerEvent
+
+case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent
+
+case class SparkListenerTaskStart(stageId: Int, taskInfo: TaskInfo) extends SparkListenerEvent
-case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties)
- extends SparkListenerEvents
+case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent
-case class SparkListenerStageCompleted(stage: StageInfo) extends SparkListenerEvents
+case class SparkListenerTaskEnd(
+ stageId: Int,
+ taskType: String,
+ reason: TaskEndReason,
+ taskInfo: TaskInfo,
+ taskMetrics: TaskMetrics)
+ extends SparkListenerEvent
-case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
+case class SparkListenerJobStart(jobId: Int, stageIds: Seq[Int], properties: Properties = null)
+ extends SparkListenerEvent
-case class SparkListenerTaskGettingResult(
- task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
+case class SparkListenerJobEnd(jobId: Int, jobResult: JobResult) extends SparkListenerEvent
-case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
- taskMetrics: TaskMetrics) extends SparkListenerEvents
+case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]])
+ extends SparkListenerEvent
-case class SparkListenerJobStart(job: ActiveJob, stageIds: Array[Int],
- properties: Properties = null) extends SparkListenerEvents
+case class SparkListenerBlockManagerAdded(blockManagerId: BlockManagerId, maxMem: Long)
+ extends SparkListenerEvent
-case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
- extends SparkListenerEvents
+case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
+ extends SparkListenerEvent
+
+case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
/** An event used in the listener to shutdown the listener daemon thread. */
-private[scheduler] case object SparkListenerShutdown extends SparkListenerEvents
+private[spark] case object SparkListenerShutdown extends SparkListenerEvent
+
/**
* Interface for listening to events from the Spark scheduler.
@@ -87,97 +106,134 @@ trait SparkListener {
*/
def onJobEnd(jobEnd: SparkListenerJobEnd) { }
+ /**
+ * Called when environment properties have been updated
+ */
+ def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { }
+
+ /**
+ * Called when a new block manager has joined
+ */
+ def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded) { }
+
+ /**
+ * Called when an existing block manager has been removed
+ */
+ def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { }
+
+ /**
+ * Called when an RDD is manually unpersisted by the application
+ */
+ def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) { }
}
/**
* Simple SparkListener that logs a few summary statistics when each stage completes
*/
class StatsReportListener extends SparkListener with Logging {
+
+ import org.apache.spark.scheduler.StatsReportListener._
+
+ private val taskInfoMetrics = mutable.Buffer[(TaskInfo, TaskMetrics)]()
+
+ override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+ val info = taskEnd.taskInfo
+ val metrics = taskEnd.taskMetrics
+ if (info != null && metrics != null) {
+ taskInfoMetrics += ((info, metrics))
+ }
+ }
+
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
- import org.apache.spark.scheduler.StatsReportListener._
implicit val sc = stageCompleted
- this.logInfo("Finished stage: " + stageCompleted.stage)
- showMillisDistribution("task runtime:", (info, _) => Some(info.duration))
+ this.logInfo("Finished stage: " + stageCompleted.stageInfo)
+ showMillisDistribution("task runtime:", (info, _) => Some(info.duration), taskInfoMetrics)
- //shuffle write
+ // Shuffle write
showBytesDistribution("shuffle bytes written:",
- (_,metric) => metric.shuffleWriteMetrics.map(_.shuffleBytesWritten))
+ (_, metric) => metric.shuffleWriteMetrics.map(_.shuffleBytesWritten), taskInfoMetrics)
- //fetch & io
+ // Fetch & I/O
showMillisDistribution("fetch wait time:",
- (_, metric) => metric.shuffleReadMetrics.map(_.fetchWaitTime))
+ (_, metric) => metric.shuffleReadMetrics.map(_.fetchWaitTime), taskInfoMetrics)
showBytesDistribution("remote bytes read:",
- (_, metric) => metric.shuffleReadMetrics.map(_.remoteBytesRead))
- showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize))
-
- //runtime breakdown
+ (_, metric) => metric.shuffleReadMetrics.map(_.remoteBytesRead), taskInfoMetrics)
+ showBytesDistribution("task result size:",
+ (_, metric) => Some(metric.resultSize), taskInfoMetrics)
- val runtimePcts = stageCompleted.stage.taskInfos.map{
- case (info, metrics) => RuntimePercentage(info.duration, metrics)
+ // Runtime breakdown
+ val runtimePcts = taskInfoMetrics.map { case (info, metrics) =>
+ RuntimePercentage(info.duration, metrics)
}
showDistribution("executor (non-fetch) time pct: ",
- Distribution(runtimePcts.map{_.executorPct * 100}), "%2.0f %%")
+ Distribution(runtimePcts.map(_.executorPct * 100)), "%2.0f %%")
showDistribution("fetch wait time pct: ",
- Distribution(runtimePcts.flatMap{_.fetchPct.map{_ * 100}}), "%2.0f %%")
- showDistribution("other time pct: ", Distribution(runtimePcts.map{_.other * 100}), "%2.0f %%")
+ Distribution(runtimePcts.flatMap(_.fetchPct.map(_ * 100))), "%2.0f %%")
+ showDistribution("other time pct: ", Distribution(runtimePcts.map(_.other * 100)), "%2.0f %%")
+ taskInfoMetrics.clear()
}
}
private[spark] object StatsReportListener extends Logging {
- //for profiling, the extremes are more interesting
+ // For profiling, the extremes are more interesting
val percentiles = Array[Int](0,5,10,25,50,75,90,95,100)
- val probabilities = percentiles.map{_ / 100.0}
+ val probabilities = percentiles.map(_ / 100.0)
val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
- def extractDoubleDistribution(stage: SparkListenerStageCompleted,
- getMetric: (TaskInfo,TaskMetrics) => Option[Double])
- : Option[Distribution] = {
- Distribution(stage.stage.taskInfos.flatMap {
- case ((info,metric)) => getMetric(info, metric)})
+ def extractDoubleDistribution(
+ taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)],
+ getMetric: (TaskInfo, TaskMetrics) => Option[Double]): Option[Distribution] = {
+ Distribution(taskInfoMetrics.flatMap { case (info, metric) => getMetric(info, metric) })
}
- //is there some way to setup the types that I can get rid of this completely?
- def extractLongDistribution(stage: SparkListenerStageCompleted,
- getMetric: (TaskInfo,TaskMetrics) => Option[Long])
- : Option[Distribution] = {
- extractDoubleDistribution(stage, (info, metric) => getMetric(info,metric).map{_.toDouble})
+ // Is there some way to setup the types that I can get rid of this completely?
+ def extractLongDistribution(
+ taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)],
+ getMetric: (TaskInfo, TaskMetrics) => Option[Long]): Option[Distribution] = {
+ extractDoubleDistribution(
+ taskInfoMetrics,
+ (info, metric) => { getMetric(info, metric).map(_.toDouble) })
}
def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
val stats = d.statCounter
- val quantiles = d.getQuantiles(probabilities).map{formatNumber}
+ val quantiles = d.getQuantiles(probabilities).map(formatNumber)
logInfo(heading + stats)
logInfo(percentilesHeader)
logInfo("\t" + quantiles.mkString("\t"))
}
- def showDistribution(heading: String, dOpt: Option[Distribution], formatNumber: Double => String)
- {
+ def showDistribution(
+ heading: String,
+ dOpt: Option[Distribution],
+ formatNumber: Double => String) {
dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
}
def showDistribution(heading: String, dOpt: Option[Distribution], format:String) {
- def f(d:Double) = format.format(d)
+ def f(d: Double) = format.format(d)
showDistribution(heading, dOpt, f _)
}
def showDistribution(
heading: String,
format: String,
- getMetric: (TaskInfo, TaskMetrics) => Option[Double])
- (implicit stage: SparkListenerStageCompleted) {
- showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
+ getMetric: (TaskInfo, TaskMetrics) => Option[Double],
+ taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
+ showDistribution(heading, extractDoubleDistribution(taskInfoMetrics, getMetric), format)
}
- def showBytesDistribution(heading:String, getMetric: (TaskInfo,TaskMetrics) => Option[Long])
- (implicit stage: SparkListenerStageCompleted) {
- showBytesDistribution(heading, extractLongDistribution(stage, getMetric))
+ def showBytesDistribution(
+ heading:String,
+ getMetric: (TaskInfo, TaskMetrics) => Option[Long],
+ taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
+ showBytesDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric))
}
def showBytesDistribution(heading: String, dOpt: Option[Distribution]) {
- dOpt.foreach{dist => showBytesDistribution(heading, dist)}
+ dOpt.foreach { dist => showBytesDistribution(heading, dist) }
}
def showBytesDistribution(heading: String, dist: Distribution) {
@@ -189,9 +245,11 @@ private[spark] object StatsReportListener extends Logging {
(d => StatsReportListener.millisToString(d.toLong)): Double => String)
}
- def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long])
- (implicit stage: SparkListenerStageCompleted) {
- showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
+ def showMillisDistribution(
+ heading: String,
+ getMetric: (TaskInfo, TaskMetrics) => Option[Long],
+ taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
+ showMillisDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric))
}
val seconds = 1000L
@@ -199,7 +257,7 @@ private[spark] object StatsReportListener extends Logging {
val hours = minutes * 60
/**
- * reformat a time interval in milliseconds to a prettier format for output
+ * Reformat a time interval in milliseconds to a prettier format for output
*/
def millisToString(ms: Long) = {
val (size, units) =
@@ -221,8 +279,8 @@ private case class RuntimePercentage(executorPct: Double, fetchPct: Option[Doubl
private object RuntimePercentage {
def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
val denom = totalTime.toDouble
- val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime}
- val fetch = fetchTime.map{_ / denom}
+ val fetchTime = metrics.shuffleReadMetrics.map(_.fetchWaitTime)
+ val fetch = fetchTime.map(_ / denom)
val exec = (metrics.executorRunTime - fetchTime.getOrElse(0L)) / denom
val other = 1.0 - (exec + fetch.getOrElse(0d))
RuntimePercentage(exec, fetch, other)
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 17b1328..729e120 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -1,100 +1,67 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler
-
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
-
-import org.apache.spark.Logging
-
-/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
-private[spark] class SparkListenerBus extends Logging {
- private val sparkListeners = new ArrayBuffer[SparkListener] with SynchronizedBuffer[SparkListener]
-
- /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
- * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
- private val EVENT_QUEUE_CAPACITY = 10000
- private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
- private var queueFullErrorMessageLogged = false
-
- // Create a new daemon thread to listen for events. This thread is stopped when it receives
- // a SparkListenerShutdown event, using the stop method.
- new Thread("SparkListenerBus") {
- setDaemon(true)
- override def run() {
- while (true) {
- val event = eventQueue.take
- event match {
- case stageSubmitted: SparkListenerStageSubmitted =>
- sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
- case stageCompleted: SparkListenerStageCompleted =>
- sparkListeners.foreach(_.onStageCompleted(stageCompleted))
- case jobStart: SparkListenerJobStart =>
- sparkListeners.foreach(_.onJobStart(jobStart))
- case jobEnd: SparkListenerJobEnd =>
- sparkListeners.foreach(_.onJobEnd(jobEnd))
- case taskStart: SparkListenerTaskStart =>
- sparkListeners.foreach(_.onTaskStart(taskStart))
- case taskGettingResult: SparkListenerTaskGettingResult =>
- sparkListeners.foreach(_.onTaskGettingResult(taskGettingResult))
- case taskEnd: SparkListenerTaskEnd =>
- sparkListeners.foreach(_.onTaskEnd(taskEnd))
- case SparkListenerShutdown =>
- // Get out of the while loop and shutdown the daemon thread
- return
- case _ =>
- }
- }
- }
- }.start()
-
- def addListener(listener: SparkListener) {
- sparkListeners += listener
- }
-
- def post(event: SparkListenerEvents) {
- val eventAdded = eventQueue.offer(event)
- if (!eventAdded && !queueFullErrorMessageLogged) {
- logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
- "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
- "rate at which tasks are being started by the scheduler.")
- queueFullErrorMessageLogged = true
- }
- }
-
- /**
- * Waits until there are no more events in the queue, or until the specified time has elapsed.
- * Used for testing only. Returns true if the queue has emptied and false is the specified time
- * elapsed before the queue emptied.
- */
- def waitUntilEmpty(timeoutMillis: Int): Boolean = {
- val finishTime = System.currentTimeMillis + timeoutMillis
- while (!eventQueue.isEmpty) {
- if (System.currentTimeMillis > finishTime) {
- return false
- }
- /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
- * add overhead in the general case. */
- Thread.sleep(10)
- }
- true
- }
-
- def stop(): Unit = post(SparkListenerShutdown)
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * A SparkListenerEvent bus that relays events to its listeners
+ */
+private[spark] trait SparkListenerBus {
+
+ // SparkListeners attached to this event bus
+ protected val sparkListeners = new ArrayBuffer[SparkListener]
+ with mutable.SynchronizedBuffer[SparkListener]
+
+ def addListener(listener: SparkListener) {
+ sparkListeners += listener
+ }
+
+ /**
+ * Post an event to all attached listeners. This does nothing if the event is
+ * SparkListenerShutdown.
+ */
+ protected def postToAll(event: SparkListenerEvent) {
+ event match {
+ case stageSubmitted: SparkListenerStageSubmitted =>
+ sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
+ case stageCompleted: SparkListenerStageCompleted =>
+ sparkListeners.foreach(_.onStageCompleted(stageCompleted))
+ case jobStart: SparkListenerJobStart =>
+ sparkListeners.foreach(_.onJobStart(jobStart))
+ case jobEnd: SparkListenerJobEnd =>
+ sparkListeners.foreach(_.onJobEnd(jobEnd))
+ case taskStart: SparkListenerTaskStart =>
+ sparkListeners.foreach(_.onTaskStart(taskStart))
+ case taskGettingResult: SparkListenerTaskGettingResult =>
+ sparkListeners.foreach(_.onTaskGettingResult(taskGettingResult))
+ case taskEnd: SparkListenerTaskEnd =>
+ sparkListeners.foreach(_.onTaskEnd(taskEnd))
+ case environmentUpdate: SparkListenerEnvironmentUpdate =>
+ sparkListeners.foreach(_.onEnvironmentUpdate(environmentUpdate))
+ case blockManagerAdded: SparkListenerBlockManagerAdded =>
+ sparkListeners.foreach(_.onBlockManagerAdded(blockManagerAdded))
+ case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
+ sparkListeners.foreach(_.onBlockManagerRemoved(blockManagerRemoved))
+ case unpersistRDD: SparkListenerUnpersistRDD =>
+ sparkListeners.foreach(_.onUnpersistRDD(unpersistRDD))
+ case SparkListenerShutdown =>
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 8f320e5..8115a7e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -17,28 +17,25 @@
package org.apache.spark.scheduler
-import scala.collection._
-
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.storage.RDDInfo
/**
* Stores information about a stage to pass from the scheduler to SparkListeners.
- *
- * taskInfos stores the metrics for all tasks that have completed, including redundant, speculated
- * tasks.
*/
-class StageInfo(
- stage: Stage,
- val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] =
- mutable.Buffer[(TaskInfo, TaskMetrics)]()
-) {
- val stageId = stage.id
+private[spark]
+class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfo: RDDInfo) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
var completionTime: Option[Long] = None
- val rddName = stage.rdd.name
- val name = stage.name
- val numPartitions = stage.numPartitions
- val numTasks = stage.numTasks
var emittedTaskSizeWarning = false
}
+
+private[spark]
+object StageInfo {
+ def fromStage(stage: Stage): StageInfo = {
+ val rdd = stage.rdd
+ val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
+ val rddInfo = new RDDInfo(rdd.id, rddName, rdd.partitions.size, rdd.getStorageLevel)
+ new StageInfo(stage.id, stage.name, stage.numTasks, rddInfo)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
index ea3229b..308edb1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
@@ -18,7 +18,7 @@
package org.apache.spark.scheduler
private[spark] object TaskLocality extends Enumeration {
- // process local is expected to be used ONLY within tasksetmanager for now.
+ // Process local is expected to be used ONLY within TaskSetManager for now.
val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
type TaskLocality = Value
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index abff252..30bceb4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -204,7 +204,7 @@ private[spark] class TaskSchedulerImpl(
executorIdToHost(o.executorId) = o.host
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
- executorGained(o.executorId, o.host)
+ executorAdded(o.executorId, o.host)
}
}
@@ -400,8 +400,8 @@ private[spark] class TaskSchedulerImpl(
rootPool.executorLost(executorId, host)
}
- def executorGained(execId: String, host: String) {
- dagScheduler.executorGained(execId, host)
+ def executorAdded(execId: String, host: String) {
+ dagScheduler.executorAdded(execId, host)
}
def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized {
http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index ee4b65e..25b7472 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -18,7 +18,7 @@
package org.apache.spark.scheduler.cluster
import org.apache.spark.{Logging, SparkContext}
-import org.apache.spark.deploy.{Command, ApplicationDescription}
+import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
import org.apache.spark.util.Utils
@@ -26,8 +26,7 @@ import org.apache.spark.util.Utils
private[spark] class SparkDeploySchedulerBackend(
scheduler: TaskSchedulerImpl,
sc: SparkContext,
- masters: Array[String],
- appName: String)
+ masters: Array[String])
extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
with AppClientListener
with Logging {
@@ -49,8 +48,8 @@ private[spark] class SparkDeploySchedulerBackend(
val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome()
- val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command,
- sparkHome, "http://" + sc.ui.appUIAddress)
+ val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
+ sparkHome, sc.ui.appUIAddress, sc.eventLoggingInfo)
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()