You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/19 21:17:14 UTC

[5/6] [SPARK-1132] Persisting Web UI through refactoring the SparkListener interface

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index d83d034..77c558a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -32,7 +32,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId}
-import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
+import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
 
 /**
  * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
@@ -54,87 +54,53 @@ import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedH
  */
 private[spark]
 class DAGScheduler(
-    taskSched: TaskScheduler,
+    taskScheduler: TaskScheduler,
+    listenerBus: LiveListenerBus,
     mapOutputTracker: MapOutputTrackerMaster,
     blockManagerMaster: BlockManagerMaster,
     env: SparkEnv)
   extends Logging {
 
-  def this(taskSched: TaskScheduler) {
-    this(taskSched, SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
-      SparkEnv.get.blockManager.master, SparkEnv.get)
-  }
-  taskSched.setDAGScheduler(this)
+  import DAGScheduler._
 
-  // Called by TaskScheduler to report task's starting.
-  def taskStarted(task: Task[_], taskInfo: TaskInfo) {
-    eventProcessActor ! BeginEvent(task, taskInfo)
-  }
-
-  // Called to report that a task has completed and results are being fetched remotely.
-  def taskGettingResult(task: Task[_], taskInfo: TaskInfo) {
-    eventProcessActor ! GettingResultEvent(task, taskInfo)
+  def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
+    this(
+      taskScheduler,
+      sc.listenerBus,
+      sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
+      sc.env.blockManager.master,
+      sc.env)
   }
 
-  // Called by TaskScheduler to report task completions or failures.
-  def taskEnded(
-      task: Task[_],
-      reason: TaskEndReason,
-      result: Any,
-      accumUpdates: Map[Long, Any],
-      taskInfo: TaskInfo,
-      taskMetrics: TaskMetrics) {
-    eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)
-  }
-
-  // Called by TaskScheduler when an executor fails.
-  def executorLost(execId: String) {
-    eventProcessActor ! ExecutorLost(execId)
-  }
-
-  // Called by TaskScheduler when a host is added
-  def executorGained(execId: String, host: String) {
-    eventProcessActor ! ExecutorGained(execId, host)
-  }
-
-  // Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or
-  // cancellation of the job itself.
-  def taskSetFailed(taskSet: TaskSet, reason: String) {
-    eventProcessActor ! TaskSetFailed(taskSet, reason)
-  }
-
-  // The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
-  // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
-  // as more failure events come in
-  val RESUBMIT_TIMEOUT = 200.milliseconds
-
-  // The time, in millis, to wake up between polls of the completion queue in order to potentially
-  // resubmit failed stages
-  val POLL_TIMEOUT = 10L
-
-  // Warns the user if a stage contains a task with size greater than this value (in KB)
-  val TASK_SIZE_TO_WARN = 100
+  def this(sc: SparkContext) = this(sc, sc.taskScheduler)
 
   private var eventProcessActor: ActorRef = _
 
   private[scheduler] val nextJobId = new AtomicInteger(0)
-
-  def numTotalJobs: Int = nextJobId.get()
-
+  private[scheduler] def numTotalJobs: Int = nextJobId.get()
   private val nextStageId = new AtomicInteger(0)
 
   private[scheduler] val jobIdToStageIds = new TimeStampedHashMap[Int, HashSet[Int]]
-
   private[scheduler] val stageIdToJobIds = new TimeStampedHashMap[Int, HashSet[Int]]
-
   private[scheduler] val stageIdToStage = new TimeStampedHashMap[Int, Stage]
-
   private[scheduler] val shuffleToMapStage = new TimeStampedHashMap[Int, Stage]
-
+  private[scheduler] val stageIdToActiveJob = new HashMap[Int, ActiveJob]
+  private[scheduler] val resultStageToJob = new HashMap[Stage, ActiveJob]
   private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
 
-  // An async scheduler event bus. The bus should be stopped when DAGSCheduler is stopped.
-  private[spark] val listenerBus = new SparkListenerBus
+  // Stages we need to run whose parents aren't done
+  private[scheduler] val waitingStages = new HashSet[Stage]
+
+  // Stages we are running right now
+  private[scheduler] val runningStages = new HashSet[Stage]
+
+  // Stages that must be resubmitted due to fetch failures
+  private[scheduler] val failedStages = new HashSet[Stage]
+
+  // Missing tasks from each stage
+  private[scheduler] val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]]
+
+  private[scheduler] val activeJobs = new HashSet[ActiveJob]
 
   // Contains the locations that each RDD's partitions are cached on
   private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
@@ -145,22 +111,12 @@ class DAGScheduler(
   //
   // TODO: Garbage collect information about failure epochs when we know there are no more
   //       stray messages to detect.
-  val failedEpoch = new HashMap[String, Long]
+  private val failedEpoch = new HashMap[String, Long]
 
-  // stage id to the active job
-  val idToActiveJob = new HashMap[Int, ActiveJob]
+  private val metadataCleaner =
+    new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup, env.conf)
 
-  val waiting = new HashSet[Stage] // Stages we need to run whose parents aren't done
-  val running = new HashSet[Stage] // Stages we are running right now
-  val failed = new HashSet[Stage]  // Stages that must be resubmitted due to fetch failures
-  // Missing tasks from each stage
-  val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]]
-
-  val activeJobs = new HashSet[ActiveJob]
-  val resultStageToJob = new HashMap[Stage, ActiveJob]
-
-  val metadataCleaner = new MetadataCleaner(
-    MetadataCleanerType.DAG_SCHEDULER, this.cleanup, env.conf)
+  taskScheduler.setDAGScheduler(this)
 
   /**
    * Starts the event processing actor.  The actor has two responsibilities:
@@ -196,13 +152,46 @@ class DAGScheduler(
     }))
   }
 
-  def addSparkListener(listener: SparkListener) {
-    listenerBus.addListener(listener)
+  // Called by TaskScheduler to report task's starting.
+  def taskStarted(task: Task[_], taskInfo: TaskInfo) {
+    eventProcessActor ! BeginEvent(task, taskInfo)
+  }
+
+  // Called to report that a task has completed and results are being fetched remotely.
+  def taskGettingResult(task: Task[_], taskInfo: TaskInfo) {
+    eventProcessActor ! GettingResultEvent(task, taskInfo)
+  }
+
+  // Called by TaskScheduler to report task completions or failures.
+  def taskEnded(
+      task: Task[_],
+      reason: TaskEndReason,
+      result: Any,
+      accumUpdates: Map[Long, Any],
+      taskInfo: TaskInfo,
+      taskMetrics: TaskMetrics) {
+    eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)
+  }
+
+  // Called by TaskScheduler when an executor fails.
+  def executorLost(execId: String) {
+    eventProcessActor ! ExecutorLost(execId)
+  }
+
+  // Called by TaskScheduler when a host is added
+  def executorAdded(execId: String, host: String) {
+    eventProcessActor ! ExecutorAdded(execId, host)
+  }
+
+  // Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or
+  // cancellation of the job itself.
+  def taskSetFailed(taskSet: TaskSet, reason: String) {
+    eventProcessActor ! TaskSetFailed(taskSet, reason)
   }
 
   private def getCacheLocs(rdd: RDD[_]): Array[Seq[TaskLocation]] = {
     if (!cacheLocs.contains(rdd.id)) {
-      val blockIds = rdd.partitions.indices.map(index=> RDDBlockId(rdd.id, index)).toArray[BlockId]
+      val blockIds = rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
       val locs = BlockManager.blockIdsToBlockManagers(blockIds, env, blockManagerMaster)
       cacheLocs(rdd.id) = blockIds.map { id =>
         locs.getOrElse(id, Nil).map(bm => TaskLocation(bm.host, bm.executorId))
@@ -250,7 +239,7 @@ class DAGScheduler(
       new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
     stageIdToStage(id) = stage
     updateJobIdStageIdMaps(jobId, stage)
-    stageToInfos(stage) = new StageInfo(stage)
+    stageToInfos(stage) = StageInfo.fromStage(stage)
     stage
   }
 
@@ -376,9 +365,9 @@ class DAGScheduler(
             def removeStage(stageId: Int) {
               // data structures based on Stage
               for (stage <- stageIdToStage.get(stageId)) {
-                if (running.contains(stage)) {
+                if (runningStages.contains(stage)) {
                   logDebug("Removing running stage %d".format(stageId))
-                  running -= stage
+                  runningStages -= stage
                 }
                 stageToInfos -= stage
                 for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) {
@@ -388,13 +377,13 @@ class DAGScheduler(
                   logDebug("Removing pending status for stage %d".format(stageId))
                 }
                 pendingTasks -= stage
-                if (waiting.contains(stage)) {
+                if (waitingStages.contains(stage)) {
                   logDebug("Removing stage %d from waiting set.".format(stageId))
-                  waiting -= stage
+                  waitingStages -= stage
                 }
-                if (failed.contains(stage)) {
+                if (failedStages.contains(stage)) {
                   logDebug("Removing stage %d from failed set.".format(stageId))
-                  failed -= stage
+                  failedStages -= stage
                 }
               }
               // data structures based on StageId
@@ -544,13 +533,14 @@ class DAGScheduler(
         logInfo("Missing parents: " + getMissingParentStages(finalStage))
         if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
           // Compute very short actions like first() or take() with no parent stages locally.
-          listenerBus.post(SparkListenerJobStart(job, Array(), properties))
+          listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
           runLocally(job)
         } else {
-          idToActiveJob(jobId) = job
+          stageIdToActiveJob(jobId) = job
           activeJobs += job
           resultStageToJob(finalStage) = job
-          listenerBus.post(SparkListenerJobStart(job, jobIdToStageIds(jobId).toArray, properties))
+          listenerBus.post(
+            SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, properties))
           submitStage(finalStage)
         }
 
@@ -563,23 +553,23 @@ class DAGScheduler(
         val activeInGroup = activeJobs.filter(activeJob =>
           groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
         val jobIds = activeInGroup.map(_.jobId)
-        jobIds.foreach { handleJobCancellation }
+        jobIds.foreach(handleJobCancellation)
 
       case AllJobsCancelled =>
         // Cancel all running jobs.
-        running.map(_.jobId).foreach { handleJobCancellation }
+        runningStages.map(_.jobId).foreach(handleJobCancellation)
         activeJobs.clear()      // These should already be empty by this point,
-        idToActiveJob.clear()   // but just in case we lost track of some jobs...
+        stageIdToActiveJob.clear()   // but just in case we lost track of some jobs...
 
-      case ExecutorGained(execId, host) =>
-        handleExecutorGained(execId, host)
+      case ExecutorAdded(execId, host) =>
+        handleExecutorAdded(execId, host)
 
       case ExecutorLost(execId) =>
         handleExecutorLost(execId)
 
       case BeginEvent(task, taskInfo) =>
         for (
-          job <- idToActiveJob.get(task.stageId);
+          job <- stageIdToActiveJob.get(task.stageId);
           stage <- stageIdToStage.get(task.stageId);
           stageInfo <- stageToInfos.get(stage)
         ) {
@@ -591,20 +581,22 @@ class DAGScheduler(
               task.stageId, stageInfo.name, taskInfo.serializedSize / 1024, TASK_SIZE_TO_WARN))
           }
         }
-        listenerBus.post(SparkListenerTaskStart(task, taskInfo))
+        listenerBus.post(SparkListenerTaskStart(task.stageId, taskInfo))
 
       case GettingResultEvent(task, taskInfo) =>
-        listenerBus.post(SparkListenerTaskGettingResult(task, taskInfo))
+        listenerBus.post(SparkListenerTaskGettingResult(taskInfo))
 
       case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
-        listenerBus.post(SparkListenerTaskEnd(task, reason, taskInfo, taskMetrics))
+        val stageId = task.stageId
+        val taskType = Utils.getFormattedClassName(task)
+        listenerBus.post(SparkListenerTaskEnd(stageId, taskType, reason, taskInfo, taskMetrics))
         handleTaskCompletion(completion)
 
       case TaskSetFailed(taskSet, reason) =>
         stageIdToStage.get(taskSet.stageId).foreach { abortStage(_, reason) }
 
       case ResubmitFailedStages =>
-        if (failed.size > 0) {
+        if (failedStages.size > 0) {
           // Failed stages may be removed by job cancellation, so failed might be empty even if
           // the ResubmitFailedStages event has been scheduled.
           resubmitFailedStages()
@@ -615,7 +607,7 @@ class DAGScheduler(
         for (job <- activeJobs) {
           val error = new SparkException("Job cancelled because SparkContext was shut down")
           job.listener.jobFailed(error)
-          listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, None)))
+          listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, -1)))
         }
         return true
     }
@@ -629,9 +621,9 @@ class DAGScheduler(
   private[scheduler] def resubmitFailedStages() {
     logInfo("Resubmitting failed stages")
     clearCacheLocs()
-    val failed2 = failed.toArray
-    failed.clear()
-    for (stage <- failed2.sortBy(_.jobId)) {
+    val failedStagesCopy = failedStages.toArray
+    failedStages.clear()
+    for (stage <- failedStagesCopy.sortBy(_.jobId)) {
       submitStage(stage)
     }
   }
@@ -644,12 +636,12 @@ class DAGScheduler(
     // TODO: We might want to run this less often, when we are sure that something has become
     // runnable that wasn't before.
     logTrace("Checking for newly runnable parent stages")
-    logTrace("running: " + running)
-    logTrace("waiting: " + waiting)
-    logTrace("failed: " + failed)
-    val waiting2 = waiting.toArray
-    waiting.clear()
-    for (stage <- waiting2.sortBy(_.jobId)) {
+    logTrace("running: " + runningStages)
+    logTrace("waiting: " + waitingStages)
+    logTrace("failed: " + failedStages)
+    val waitingStagesCopy = waitingStages.toArray
+    waitingStages.clear()
+    for (stage <- waitingStagesCopy.sortBy(_.jobId)) {
       submitStage(stage)
     }
   }
@@ -685,7 +677,7 @@ class DAGScheduler(
       }
     } catch {
       case e: Exception =>
-        jobResult = JobFailed(e, Some(job.finalStage))
+        jobResult = JobFailed(e, job.finalStage.id)
         job.listener.jobFailed(e)
     } finally {
       val s = job.finalStage
@@ -693,7 +685,7 @@ class DAGScheduler(
       stageIdToStage -= s.id     // but that won't get cleaned up via the normal paths through
       stageToInfos -= s          // completion events or stage abort
       jobIdToStageIds -= job.jobId
-      listenerBus.post(SparkListenerJobEnd(job, jobResult))
+      listenerBus.post(SparkListenerJobEnd(job.jobId, jobResult))
     }
   }
 
@@ -705,7 +697,7 @@ class DAGScheduler(
   private def activeJobForStage(stage: Stage): Option[Int] = {
     if (stageIdToJobIds.contains(stage.id)) {
       val jobsThatUseStage: Array[Int] = stageIdToJobIds(stage.id).toArray.sorted
-      jobsThatUseStage.find(idToActiveJob.contains(_))
+      jobsThatUseStage.find(stageIdToActiveJob.contains)
     } else {
       None
     }
@@ -716,18 +708,18 @@ class DAGScheduler(
     val jobId = activeJobForStage(stage)
     if (jobId.isDefined) {
       logDebug("submitStage(" + stage + ")")
-      if (!waiting(stage) && !running(stage) && !failed(stage)) {
+      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
         val missing = getMissingParentStages(stage).sortBy(_.id)
         logDebug("missing: " + missing)
         if (missing == Nil) {
           logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
           submitMissingTasks(stage, jobId.get)
-          running += stage
+          runningStages += stage
         } else {
           for (parent <- missing) {
             submitStage(parent)
           }
-          waiting += stage
+          waitingStages += stage
         }
       }
     } else {
@@ -758,8 +750,8 @@ class DAGScheduler(
       }
     }
 
-    val properties = if (idToActiveJob.contains(jobId)) {
-      idToActiveJob(stage.jobId).properties
+    val properties = if (stageIdToActiveJob.contains(jobId)) {
+      stageIdToActiveJob(stage.jobId).properties
     } else {
       //this stage will be assigned to "default" pool
       null
@@ -779,20 +771,20 @@ class DAGScheduler(
       } catch {
         case e: NotSerializableException =>
           abortStage(stage, "Task not serializable: " + e.toString)
-          running -= stage
+          runningStages -= stage
           return
       }
 
       logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
       myPending ++= tasks
       logDebug("New pending tasks: " + myPending)
-      taskSched.submitTasks(
+      taskScheduler.submitTasks(
         new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
       stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
     } else {
       logDebug("Stage " + stage + " is actually done; %b %d %d".format(
         stage.isAvailable, stage.numAvailableOutputs, stage.numPartitions))
-      running -= stage
+      runningStages -= stage
     }
   }
 
@@ -817,7 +809,7 @@ class DAGScheduler(
       logInfo("%s (%s) finished in %s s".format(stage, stage.name, serviceTime))
       stageToInfos(stage).completionTime = Some(System.currentTimeMillis())
       listenerBus.post(SparkListenerStageCompleted(stageToInfos(stage)))
-      running -= stage
+      runningStages -= stage
     }
     event.reason match {
       case Success =>
@@ -826,7 +818,6 @@ class DAGScheduler(
           Accumulators.add(event.accumUpdates) // TODO: do this only if task wasn't resubmitted
         }
         pendingTasks(stage) -= task
-        stageToInfos(stage).taskInfos += event.taskInfo -> event.taskMetrics
         task match {
           case rt: ResultTask[_, _] =>
             resultStageToJob.get(stage) match {
@@ -836,12 +827,12 @@ class DAGScheduler(
                   job.numFinished += 1
                   // If the whole job has finished, remove it
                   if (job.numFinished == job.numPartitions) {
-                    idToActiveJob -= stage.jobId
+                    stageIdToActiveJob -= stage.jobId
                     activeJobs -= job
                     resultStageToJob -= stage
                     markStageAsFinished(stage)
                     jobIdToStageIdsRemove(job.jobId)
-                    listenerBus.post(SparkListenerJobEnd(job, JobSucceeded))
+                    listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
                   }
                   job.listener.taskSucceeded(rt.outputId, event.result)
                 }
@@ -858,12 +849,12 @@ class DAGScheduler(
             } else {
               stage.addOutputLoc(smt.partitionId, status)
             }
-            if (running.contains(stage) && pendingTasks(stage).isEmpty) {
+            if (runningStages.contains(stage) && pendingTasks(stage).isEmpty) {
               markStageAsFinished(stage)
               logInfo("looking for newly runnable stages")
-              logInfo("running: " + running)
-              logInfo("waiting: " + waiting)
-              logInfo("failed: " + failed)
+              logInfo("running: " + runningStages)
+              logInfo("waiting: " + waitingStages)
+              logInfo("failed: " + failedStages)
               if (stage.shuffleDep.isDefined) {
                 // We supply true to increment the epoch number here in case this is a
                 // recomputation of the map outputs. In that case, some nodes may have cached
@@ -886,14 +877,14 @@ class DAGScheduler(
                 submitStage(stage)
               } else {
                 val newlyRunnable = new ArrayBuffer[Stage]
-                for (stage <- waiting) {
+                for (stage <- waitingStages) {
                   logInfo("Missing parents for " + stage + ": " + getMissingParentStages(stage))
                 }
-                for (stage <- waiting if getMissingParentStages(stage) == Nil) {
+                for (stage <- waitingStages if getMissingParentStages(stage) == Nil) {
                   newlyRunnable += stage
                 }
-                waiting --= newlyRunnable
-                running ++= newlyRunnable
+                waitingStages --= newlyRunnable
+                runningStages ++= newlyRunnable
                 for {
                   stage <- newlyRunnable.sortBy(_.id)
                   jobId <- activeJobForStage(stage)
@@ -912,7 +903,7 @@ class DAGScheduler(
       case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
         // Mark the stage that the reducer was in as unrunnable
         val failedStage = stageIdToStage(task.stageId)
-        running -= failedStage
+        runningStages -= failedStage
         // TODO: Cancel running tasks in the stage
         logInfo("Marking " + failedStage + " (" + failedStage.name +
           ") for resubmision due to a fetch failure")
@@ -924,7 +915,7 @@ class DAGScheduler(
         }
         logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
           "); marking it for resubmission")
-        if (failed.isEmpty && eventProcessActor != null) {
+        if (failedStages.isEmpty && eventProcessActor != null) {
           // Don't schedule an event to resubmit failed stages if failed isn't empty, because
           // in that case the event will already have been scheduled. eventProcessActor may be
           // null during unit tests.
@@ -932,8 +923,8 @@ class DAGScheduler(
           env.actorSystem.scheduler.scheduleOnce(
             RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
         }
-        failed += failedStage
-        failed += mapStage
+        failedStages += failedStage
+        failedStages += mapStage
         // TODO: mark the executor as failed only if there were lots of fetch failures on it
         if (bmAddress != null) {
           handleExecutorLost(bmAddress.executorId, Some(task.epoch))
@@ -980,10 +971,10 @@ class DAGScheduler(
     }
   }
 
-  private def handleExecutorGained(execId: String, host: String) {
+  private def handleExecutorAdded(execId: String, host: String) {
     // remove from failedEpoch(execId) ?
     if (failedEpoch.contains(execId)) {
-      logInfo("Host gained which was in lost list earlier: " + host)
+      logInfo("Host added was in lost list earlier: " + host)
       failedEpoch -= execId
     }
   }
@@ -993,14 +984,14 @@ class DAGScheduler(
       logDebug("Trying to cancel unregistered job " + jobId)
     } else {
       val independentStages = removeJobAndIndependentStages(jobId)
-      independentStages.foreach { taskSched.cancelTasks }
+      independentStages.foreach(taskScheduler.cancelTasks)
       val error = new SparkException("Job %d cancelled".format(jobId))
-      val job = idToActiveJob(jobId)
+      val job = stageIdToActiveJob(jobId)
       job.listener.jobFailed(error)
       jobIdToStageIds -= jobId
       activeJobs -= job
-      idToActiveJob -= jobId
-      listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(job.finalStage))))
+      stageIdToActiveJob -= jobId
+      listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, job.finalStage.id)))
     }
   }
 
@@ -1020,10 +1011,10 @@ class DAGScheduler(
       val error = new SparkException("Job aborted: " + reason)
       job.listener.jobFailed(error)
       jobIdToStageIdsRemove(job.jobId)
-      idToActiveJob -= resultStage.jobId
+      stageIdToActiveJob -= resultStage.jobId
       activeJobs -= job
       resultStageToJob -= resultStage
-      listenerBus.post(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))
+      listenerBus.post(SparkListenerJobEnd(job.jobId, JobFailed(error, failedStage.id)))
     }
     if (dependentStages.isEmpty) {
       logInfo("Ignoring failure of " + failedStage + " because all jobs depending on it are done")
@@ -1102,11 +1093,11 @@ class DAGScheduler(
       "stageToInfos" -> stageToInfos,
       "jobIdToStageIds" -> jobIdToStageIds,
       "stageIdToJobIds" -> stageIdToJobIds).
-      foreach { case(s, t) => {
-      val sizeBefore = t.size
-      t.clearOldValues(cleanupTime)
-      logInfo("%s %d --> %d".format(s, sizeBefore, t.size))
-    }}
+      foreach { case (s, t) =>
+        val sizeBefore = t.size
+        t.clearOldValues(cleanupTime)
+        logInfo("%s %d --> %d".format(s, sizeBefore, t.size))
+      }
   }
 
   def stop() {
@@ -1114,7 +1105,20 @@ class DAGScheduler(
       eventProcessActor ! StopDAGScheduler
     }
     metadataCleaner.cancel()
-    taskSched.stop()
-    listenerBus.stop()
+    taskScheduler.stop()
   }
 }
+
+private[spark] object DAGScheduler {
+  // The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
+  // this is a simplistic way to avoid resubmitting tasks in the non-fetchable map stage one by one
+  // as more failure events come in
+  val RESUBMIT_TIMEOUT = 200.milliseconds
+
+  // The time, in millis, to wake up between polls of the completion queue in order to potentially
+  // resubmit failed stages
+  val POLL_TIMEOUT = 10L
+
+  // Warns the user if a stage contains a task with size greater than this value (in KB)
+  val TASK_SIZE_TO_WARN = 100
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 39cd98e..04c53d4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -65,7 +65,7 @@ private[scheduler] case class CompletionEvent(
     taskMetrics: TaskMetrics)
   extends DAGSchedulerEvent
 
-private[scheduler] case class ExecutorGained(execId: String, host: String) extends DAGSchedulerEvent
+private[scheduler] case class ExecutorAdded(execId: String, host: String) extends DAGSchedulerEvent
 
 private[scheduler] case class ExecutorLost(execId: String) extends DAGSchedulerEvent
 

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
index b52fe24..5878e73 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala
@@ -28,15 +28,15 @@ private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: Spar
   val sourceName = "%s.DAGScheduler".format(sc.appName)
 
   metricRegistry.register(MetricRegistry.name("stage", "failedStages"), new Gauge[Int] {
-    override def getValue: Int = dagScheduler.failed.size
+    override def getValue: Int = dagScheduler.failedStages.size
   })
 
   metricRegistry.register(MetricRegistry.name("stage", "runningStages"), new Gauge[Int] {
-    override def getValue: Int = dagScheduler.running.size
+    override def getValue: Int = dagScheduler.runningStages.size
   })
 
   metricRegistry.register(MetricRegistry.name("stage", "waitingStages"), new Gauge[Int] {
-    override def getValue: Int = dagScheduler.waiting.size
+    override def getValue: Int = dagScheduler.waitingStages.size
   })
 
   metricRegistry.register(MetricRegistry.name("job", "allJobs"), new Gauge[Int] {

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
new file mode 100644
index 0000000..217f882
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.{JsonProtocol, FileLogger}
+
+/**
+ * A SparkListener that logs events to persistent storage.
+ *
+ * Event logging is specified by the following configurable parameters:
+ *   spark.eventLog.enabled - Whether event logging is enabled.
+ *   spark.eventLog.compress - Whether to compress logged events
+ *   spark.eventLog.overwrite - Whether to overwrite any existing files.
+ *   spark.eventLog.dir - Path to the directory in which events are logged.
+ *   spark.eventLog.buffer.kb - Buffer size to use when writing to output streams
+ */
+private[spark] class EventLoggingListener(appName: String, conf: SparkConf)
+  extends SparkListener with Logging {
+
+  private val shouldCompress = conf.getBoolean("spark.eventLog.compress", false)
+  private val shouldOverwrite = conf.getBoolean("spark.eventLog.overwrite", false)
+  private val outputBufferSize = conf.getInt("spark.eventLog.buffer.kb", 100) * 1024
+  private val logBaseDir = conf.get("spark.eventLog.dir", "/tmp/spark-events").stripSuffix("/")
+  private val name = appName.replaceAll("[ :/]", "-").toLowerCase + "-" + System.currentTimeMillis
+  val logDir = logBaseDir + "/" + name
+
+  private val logger =
+    new FileLogger(logDir, conf, outputBufferSize, shouldCompress, shouldOverwrite)
+
+  // Information needed to replay the events logged by this listener later
+  val info = {
+    val compressionCodec = if (shouldCompress) {
+      Some(conf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC))
+    } else None
+    EventLoggingInfo(logDir, compressionCodec)
+  }
+
+  logInfo("Logging events to %s".format(logDir))
+
+  /** Log the event as JSON */
+  private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false) {
+    val eventJson = compact(render(JsonProtocol.sparkEventToJson(event)))
+    logger.logLine(eventJson)
+    if (flushLogger) {
+      logger.flush()
+    }
+  }
+
+  // Events that do not trigger a flush
+  override def onStageSubmitted(event: SparkListenerStageSubmitted) =
+    logEvent(event)
+  override def onTaskStart(event: SparkListenerTaskStart) =
+    logEvent(event)
+  override def onTaskGettingResult(event: SparkListenerTaskGettingResult) =
+    logEvent(event)
+  override def onTaskEnd(event: SparkListenerTaskEnd) =
+    logEvent(event)
+  override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate) =
+    logEvent(event)
+
+  // Events that trigger a flush
+  override def onStageCompleted(event: SparkListenerStageCompleted) =
+    logEvent(event, flushLogger = true)
+  override def onJobStart(event: SparkListenerJobStart) =
+    logEvent(event, flushLogger = true)
+  override def onJobEnd(event: SparkListenerJobEnd) =
+    logEvent(event, flushLogger = true)
+  override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded) =
+    logEvent(event, flushLogger = true)
+  override def onBlockManagerRemoved(event: SparkListenerBlockManagerRemoved) =
+    logEvent(event, flushLogger = true)
+  override def onUnpersistRDD(event: SparkListenerUnpersistRDD) =
+    logEvent(event, flushLogger = true)
+
+  def stop() = logger.stop()
+}
+
+// If compression is not enabled, compressionCodec is None
+private[spark] case class EventLoggingInfo(logDir: String, compressionCodec: Option[String])

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 01cbcc3..b3a67d7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -22,24 +22,25 @@ import java.text.SimpleDateFormat
 import java.util.{Date, Properties}
 import java.util.concurrent.LinkedBlockingQueue
 
-import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
+import scala.collection.mutable.HashMap
 
 import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.rdd.RDD
-import org.apache.spark.storage.StorageLevel
 
 /**
  * A logger class to record runtime information for jobs in Spark. This class outputs one log file
- * for each Spark job, containing RDD graph, tasks start/stop, shuffle information.
- * JobLogger is a subclass of SparkListener, use addSparkListener to add JobLogger to a SparkContext
- * after the SparkContext is created.
- * Note that each JobLogger only works for one SparkContext
- * @param logDirName The base directory for the log files.
+ * for each Spark job, containing tasks start/stop and shuffle information. JobLogger is a subclass
+ * of SparkListener, use addSparkListener to add JobLogger to a SparkContext after the SparkContext
+ * is created. Note that each JobLogger only works for one SparkContext
+ *
+ * NOTE: The functionality of this class is heavily stripped down to accommodate for a general
+ * refactor of the SparkListener interface. In its place, the EventLoggingListener is introduced
+ * to log application information as SparkListenerEvents. To enable this functionality, set
+ * spark.eventLog.enabled to true.
  */
 
-class JobLogger(val user: String, val logDirName: String)
-  extends SparkListener with Logging {
+@deprecated("Log application information by setting spark.eventLog.enabled.", "1.0.0")
+class JobLogger(val user: String, val logDirName: String) extends SparkListener with Logging {
 
   def this() = this(System.getProperty("user.name", "<unknown>"),
     String.valueOf(System.currentTimeMillis()))
@@ -51,19 +52,19 @@ class JobLogger(val user: String, val logDirName: String)
       "/tmp/spark-%s".format(user)
     }
 
-  private val jobIDToPrintWriter = new HashMap[Int, PrintWriter]
-  private val stageIDToJobID = new HashMap[Int, Int]
-  private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]]
+  private val jobIdToPrintWriter = new HashMap[Int, PrintWriter]
+  private val stageIdToJobId = new HashMap[Int, Int]
+  private val jobIdToStageIds = new HashMap[Int, Seq[Int]]
   private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
-  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents]
+  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]
 
   createLogDir()
 
   // The following 5 functions are used only in testing.
   private[scheduler] def getLogDir = logDir
-  private[scheduler] def getJobIDtoPrintWriter = jobIDToPrintWriter
-  private[scheduler] def getStageIDToJobID = stageIDToJobID
-  private[scheduler] def getJobIDToStages = jobIDToStages
+  private[scheduler] def getJobIdToPrintWriter = jobIdToPrintWriter
+  private[scheduler] def getStageIdToJobId = stageIdToJobId
+  private[scheduler] def getJobIdToStageIds = jobIdToStageIds
   private[scheduler] def getEventQueue = eventQueue
 
   /** Create a folder for log files, the folder's name is the creation time of jobLogger */
@@ -80,187 +81,78 @@ class JobLogger(val user: String, val logDirName: String)
 
   /**
    * Create a log file for one job
-   * @param jobID ID of the job
+   * @param jobId ID of the job
    * @throws FileNotFoundException Fail to create log file
    */
-  protected def createLogWriter(jobID: Int) {
+  protected def createLogWriter(jobId: Int) {
     try {
-      val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID)
-      jobIDToPrintWriter += (jobID -> fileWriter)
+      val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobId)
+      jobIdToPrintWriter += (jobId -> fileWriter)
     } catch {
       case e: FileNotFoundException => e.printStackTrace()
     }
   }
 
   /**
-   * Close log file, and clean the stage relationship in stageIDToJobID
-   * @param jobID ID of the job
+   * Close log file, and clean the stage relationship in stageIdToJobId
+   * @param jobId ID of the job
    */
-  protected def closeLogWriter(jobID: Int) {
-    jobIDToPrintWriter.get(jobID).foreach { fileWriter =>
+  protected def closeLogWriter(jobId: Int) {
+    jobIdToPrintWriter.get(jobId).foreach { fileWriter =>
       fileWriter.close()
-      jobIDToStages.get(jobID).foreach(_.foreach{ stage =>
-        stageIDToJobID -= stage.id
+      jobIdToStageIds.get(jobId).foreach(_.foreach { stageId =>
+        stageIdToJobId -= stageId
       })
-      jobIDToPrintWriter -= jobID
-      jobIDToStages -= jobID
+      jobIdToPrintWriter -= jobId
+      jobIdToStageIds -= jobId
     }
   }
 
   /**
+   * Build up the maps that represent stage-job relationships
+   * @param jobId ID of the job
+   * @param stageIds IDs of the associated stages
+   */
+  protected def buildJobStageDependencies(jobId: Int, stageIds: Seq[Int]) = {
+    jobIdToStageIds(jobId) = stageIds
+    stageIds.foreach { stageId => stageIdToJobId(stageId) = jobId }
+  }
+
+  /**
    * Write info into log file
-   * @param jobID ID of the job
+   * @param jobId ID of the job
    * @param info Info to be recorded
    * @param withTime Controls whether to record time stamp before the info, default is true
    */
-  protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) {
+  protected def jobLogInfo(jobId: Int, info: String, withTime: Boolean = true) {
     var writeInfo = info
     if (withTime) {
       val date = new Date(System.currentTimeMillis())
       writeInfo = DATE_FORMAT.format(date) + ": " + info
     }
-    jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo))
+    jobIdToPrintWriter.get(jobId).foreach(_.println(writeInfo))
   }
 
   /**
    * Write info into log file
-   * @param stageID ID of the stage
+   * @param stageId ID of the stage
    * @param info Info to be recorded
    * @param withTime Controls whether to record time stamp before the info, default is true
    */
-  protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) {
-    stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime))
-  }
-
-  /**
-   * Build stage dependency for a job
-   * @param jobID ID of the job
-   * @param stage Root stage of the job
-   */
-  protected def buildJobDep(jobID: Int, stage: Stage) {
-    if (stage.jobId == jobID) {
-      jobIDToStages.get(jobID) match {
-        case Some(stageList) => stageList += stage
-        case None => val stageList = new  ListBuffer[Stage]
-                     stageList += stage
-                     jobIDToStages += (jobID -> stageList)
-      }
-      stageIDToJobID += (stage.id -> jobID)
-      stage.parents.foreach(buildJobDep(jobID, _))
-    }
-  }
-
-  /**
-   * Record stage dependency and RDD dependency for a stage
-   * @param jobID Job ID of the stage
-   */
-  protected def recordStageDep(jobID: Int) {
-    def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = {
-      var rddList = new ListBuffer[RDD[_]]
-      rddList += rdd
-      rdd.dependencies.foreach {
-        case shufDep: ShuffleDependency[_, _] =>
-        case dep: Dependency[_] => rddList ++= getRddsInStage(dep.rdd)
-      }
-      rddList
-    }
-    jobIDToStages.get(jobID).foreach {_.foreach { stage =>
-        var depRddDesc: String = ""
-        getRddsInStage(stage.rdd).foreach { rdd =>
-          depRddDesc += rdd.id + ","
-        }
-        var depStageDesc: String = ""
-        stage.parents.foreach { stage =>
-          depStageDesc += "(" + stage.id + "," + stage.shuffleDep.get.shuffleId + ")"
-        }
-        jobLogInfo(jobID, "STAGE_ID=" + stage.id + " RDD_DEP=(" +
-                   depRddDesc.substring(0, depRddDesc.length - 1) + ")" +
-                   " STAGE_DEP=" + depStageDesc, false)
-      }
-    }
-  }
-
-  /**
-   * Generate indents and convert to String
-   * @param indent Number of indents
-   * @return string of indents
-   */
-  protected def indentString(indent: Int): String = {
-    val sb = new StringBuilder()
-    for (i <- 1 to indent) {
-      sb.append(" ")
-    }
-    sb.toString()
-  }
-
-  /**
-   * Get RDD's name
-   * @param rdd Input RDD
-   * @return String of RDD's name
-   */
-  protected def getRddName(rdd: RDD[_]): String = {
-    var rddName = rdd.getClass.getSimpleName
-    if (rdd.name != null) {
-      rddName = rdd.name
-    }
-    rddName
-  }
-
-  /**
-   * Record RDD dependency graph in a stage
-   * @param jobID Job ID of the stage
-   * @param rdd Root RDD of the stage
-   * @param indent Indent number before info
-   */
-  protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) {
-    val cacheStr = if (rdd.getStorageLevel != StorageLevel.NONE) "CACHED" else "NONE"
-    val rddInfo =
-      s"RDD_ID=$rdd.id ${getRddName(rdd)} $cacheStr " +
-      s"${rdd.getCreationSite} ${rdd.creationSiteInfo.firstUserClass}"
-    jobLogInfo(jobID, indentString(indent) + rddInfo, false)
-    rdd.dependencies.foreach {
-      case shufDep: ShuffleDependency[_, _] =>
-        val depInfo = "SHUFFLE_ID=" + shufDep.shuffleId
-        jobLogInfo(jobID, indentString(indent + 1) + depInfo, false)
-      case dep: Dependency[_] => recordRddInStageGraph(jobID, dep.rdd, indent + 1)
-    }
-  }
-
-  /**
-   * Record stage dependency graph of a job
-   * @param jobID Job ID of the stage
-   * @param stage Root stage of the job
-   * @param indent Indent number before info, default is 0
-   */
-  protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0)
-  {
-    val stageInfo = if (stage.isShuffleMap) {
-      "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId
-    } else {
-      "STAGE_ID=" + stage.id + " RESULT_STAGE"
-    }
-    if (stage.jobId == jobID) {
-      jobLogInfo(jobID, indentString(indent) + stageInfo, false)
-      if (!idSet.contains(stage.id)) {
-        idSet += stage.id
-        recordRddInStageGraph(jobID, stage.rdd, indent)
-        stage.parents.foreach(recordStageDepGraph(jobID, _, idSet, indent + 2))
-      }
-    } else {
-      jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false)
-    }
+  protected def stageLogInfo(stageId: Int, info: String, withTime: Boolean = true) {
+    stageIdToJobId.get(stageId).foreach(jobId => jobLogInfo(jobId, info, withTime))
   }
 
   /**
    * Record task metrics into job log files, including execution info and shuffle metrics
-   * @param stageID Stage ID of the task
+   * @param stageId Stage ID of the task
    * @param status Status info of the task
    * @param taskInfo Task description info
    * @param taskMetrics Task running metrics
    */
-  protected def recordTaskMetrics(stageID: Int, status: String,
+  protected def recordTaskMetrics(stageId: Int, status: String,
                                 taskInfo: TaskInfo, taskMetrics: TaskMetrics) {
-    val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID +
+    val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageId +
                " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime +
                " EXECUTOR_ID=" + taskInfo.executorId +  " HOST=" + taskMetrics.hostname
     val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime
@@ -278,7 +170,7 @@ class JobLogger(val user: String, val logDirName: String)
       case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten
       case None => ""
     }
-    stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics)
+    stageLogInfo(stageId, status + info + executorRunTime + readMetrics + writeMetrics)
   }
 
   /**
@@ -286,8 +178,9 @@ class JobLogger(val user: String, val logDirName: String)
    * @param stageSubmitted Stage submitted event
    */
   override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
-    stageLogInfo(stageSubmitted.stage.stageId,"STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
-        stageSubmitted.stage.stageId, stageSubmitted.stage.numTasks))
+    val stageInfo = stageSubmitted.stageInfo
+    stageLogInfo(stageInfo.stageId, "STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format(
+      stageInfo.stageId, stageInfo.numTasks))
   }
 
   /**
@@ -295,36 +188,30 @@ class JobLogger(val user: String, val logDirName: String)
    * @param stageCompleted Stage completed event
    */
   override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
-    stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format(
-        stageCompleted.stage.stageId))
+    val stageId = stageCompleted.stageInfo.stageId
+    stageLogInfo(stageId, "STAGE_ID=%d STATUS=COMPLETED".format(stageId))
   }
 
-  override def onTaskStart(taskStart: SparkListenerTaskStart) { }
-
   /**
    * When task ends, record task completion status and metrics
    * @param taskEnd Task end event
    */
   override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
-    val task = taskEnd.task
     val taskInfo = taskEnd.taskInfo
-    var taskStatus = ""
-    task match {
-      case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK"
-      case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK"
-    }
+    var taskStatus = "TASK_TYPE=%s".format(taskEnd.taskType)
+    val taskMetrics = if (taskEnd.taskMetrics != null) taskEnd.taskMetrics else TaskMetrics.empty()
     taskEnd.reason match {
       case Success => taskStatus += " STATUS=SUCCESS"
-        recordTaskMetrics(task.stageId, taskStatus, taskInfo, taskEnd.taskMetrics)
+        recordTaskMetrics(taskEnd.stageId, taskStatus, taskInfo, taskMetrics)
       case Resubmitted =>
         taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId +
-                      " STAGE_ID=" + task.stageId
-        stageLogInfo(task.stageId, taskStatus)
+                      " STAGE_ID=" + taskEnd.stageId
+        stageLogInfo(taskEnd.stageId, taskStatus)
       case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
         taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" +
-                      task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
+                      taskEnd.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" +
                       mapId + " REDUCE_ID=" + reduceId
-        stageLogInfo(task.stageId, taskStatus)
+        stageLogInfo(taskEnd.stageId, taskStatus)
       case _ =>
     }
   }
@@ -334,8 +221,8 @@ class JobLogger(val user: String, val logDirName: String)
    * @param jobEnd Job end event
    */
   override def onJobEnd(jobEnd: SparkListenerJobEnd) {
-    val job = jobEnd.job
-    var info = "JOB_ID=" + job.jobId
+    val jobId = jobEnd.jobId
+    var info = "JOB_ID=" + jobId
     jobEnd.jobResult match {
       case JobSucceeded => info += " STATUS=SUCCESS"
       case JobFailed(exception, _) =>
@@ -343,19 +230,19 @@ class JobLogger(val user: String, val logDirName: String)
         exception.getMessage.split("\\s+").foreach(info += _ + "_")
       case _ =>
     }
-    jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase)
-    closeLogWriter(job.jobId)
+    jobLogInfo(jobId, info.substring(0, info.length - 1).toUpperCase)
+    closeLogWriter(jobId)
   }
 
   /**
    * Record job properties into job log file
-   * @param jobID ID of the job
+   * @param jobId ID of the job
    * @param properties Properties of the job
    */
-  protected def recordJobProperties(jobID: Int, properties: Properties) {
-    if(properties != null) {
+  protected def recordJobProperties(jobId: Int, properties: Properties) {
+    if (properties != null) {
       val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "")
-      jobLogInfo(jobID, description, false)
+      jobLogInfo(jobId, description, false)
     }
   }
 
@@ -364,14 +251,11 @@ class JobLogger(val user: String, val logDirName: String)
    * @param jobStart Job start event
    */
   override def onJobStart(jobStart: SparkListenerJobStart) {
-    val job = jobStart.job
+    val jobId = jobStart.jobId
     val properties = jobStart.properties
-    createLogWriter(job.jobId)
-    recordJobProperties(job.jobId, properties)
-    buildJobDep(job.jobId, job.finalStage)
-    recordStageDep(job.jobId)
-    recordStageDepGraph(job.jobId, job.finalStage, new HashSet[Int])
-    jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED")
+    createLogWriter(jobId)
+    recordJobProperties(jobId, properties)
+    buildJobStageDependencies(jobId, jobStart.stageIds)
+    jobLogInfo(jobId, "JOB_ID=" + jobId + " STATUS=STARTED")
   }
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
index d94f6ad..3cf4e30 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobResult.scala
@@ -23,5 +23,6 @@ package org.apache.spark.scheduler
 private[spark] sealed trait JobResult
 
 private[spark] case object JobSucceeded extends JobResult
-private[spark] case class JobFailed(exception: Exception, failedStage: Option[Stage])
-  extends JobResult
+
+// A failed stage ID of -1 means there is not a particular stage that caused the failure
+private[spark] case class JobFailed(exception: Exception, failedStageId: Int) extends JobResult

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
index b026f86..8007b54 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobWaiter.scala
@@ -64,7 +64,7 @@ private[spark] class JobWaiter[T](
 
   override def jobFailed(exception: Exception): Unit = synchronized {
     _jobFinished = true
-    jobResult = JobFailed(exception, None)
+    jobResult = JobFailed(exception, -1)
     this.notifyAll()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
new file mode 100644
index 0000000..353a486
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.util.concurrent.LinkedBlockingQueue
+
+import org.apache.spark.Logging
+
+/**
+ * Asynchronously passes SparkListenerEvents to registered SparkListeners.
+ *
+ * Until start() is called, all posted events are only buffered. Only after this listener bus
+ * has started will events be actually propagated to all attached listeners. This listener bus
+ * is stopped when it receives a SparkListenerShutdown event, which is posted using stop().
+ */
+private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
+
+  /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
+   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
+  private val EVENT_QUEUE_CAPACITY = 10000
+  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
+  private var queueFullErrorMessageLogged = false
+  private var started = false
+
+  /**
+   * Start sending events to attached listeners.
+   *
+   * This first sends out all buffered events posted before this listener bus has started, then
+   * listens for any additional events asynchronously while the listener bus is still running.
+   * This should only be called once.
+   */
+  def start() {
+    if (started) {
+      throw new IllegalStateException("Listener bus already started!")
+    }
+    started = true
+    new Thread("SparkListenerBus") {
+      setDaemon(true)
+      override def run() {
+        while (true) {
+          val event = eventQueue.take
+          if (event == SparkListenerShutdown) {
+            // Get out of the while loop and shutdown the daemon thread
+            return
+          }
+          postToAll(event)
+        }
+      }
+    }.start()
+  }
+
+  def post(event: SparkListenerEvent) {
+    val eventAdded = eventQueue.offer(event)
+    if (!eventAdded && !queueFullErrorMessageLogged) {
+      logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
+        "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
+        "rate at which tasks are being started by the scheduler.")
+      queueFullErrorMessageLogged = true
+    }
+  }
+
+  /**
+   * Waits until there are no more events in the queue, or until the specified time has elapsed.
+   * Used for testing only. Returns true if the queue has emptied and false is the specified time
+   * elapsed before the queue emptied.
+   */
+  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
+    val finishTime = System.currentTimeMillis + timeoutMillis
+    while (!eventQueue.isEmpty) {
+      if (System.currentTimeMillis > finishTime) {
+        return false
+      }
+      /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
+       * add overhead in the general case. */
+      Thread.sleep(10)
+    }
+    true
+  }
+
+  def stop() {
+    if (!started) {
+      throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
+    }
+    post(SparkListenerShutdown)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 4bc13c2..187672c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -62,7 +62,7 @@ private[spark] class Pool(
   override def addSchedulable(schedulable: Schedulable) {
     schedulableQueue += schedulable
     schedulableNameToSchedulable(schedulable.name) = schedulable
-    schedulable.parent= this
+    schedulable.parent = this
   }
 
   override def removeSchedulable(schedulable: Schedulable) {

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
new file mode 100644
index 0000000..db76178
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import java.io.InputStream
+import java.net.URI
+
+import scala.io.Source
+
+import it.unimi.dsi.fastutil.io.FastBufferedInputStream
+import org.apache.hadoop.fs.{Path, FileSystem}
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.io.CompressionCodec
+import org.apache.spark.util.{JsonProtocol, Utils}
+
+/**
+ * An EventBus that replays logged events from persisted storage
+ */
+private[spark] class ReplayListenerBus(conf: SparkConf) extends SparkListenerBus with Logging {
+  private val compressed = conf.getBoolean("spark.eventLog.compress", false)
+
+  // Only used if compression is enabled
+  private lazy val compressionCodec = CompressionCodec.createCodec(conf)
+
+  /**
+   * Return a list of paths representing log files in the given directory.
+   */
+  private def getLogFilePaths(logDir: String, fileSystem: FileSystem): Array[Path] = {
+    val path = new Path(logDir)
+    if (!fileSystem.exists(path) || !fileSystem.getFileStatus(path).isDir) {
+      logWarning("Log path provided is not a valid directory: %s".format(logDir))
+      return Array[Path]()
+    }
+    val logStatus = fileSystem.listStatus(path)
+    if (logStatus == null || !logStatus.exists(!_.isDir)) {
+      logWarning("Log path provided contains no log files: %s".format(logDir))
+      return Array[Path]()
+    }
+    logStatus.filter(!_.isDir).map(_.getPath).sortBy(_.getName)
+  }
+
+  /**
+   * Replay each event in the order maintained in the given logs.
+   */
+  def replay(logDir: String): Boolean = {
+    val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
+    val logPaths = getLogFilePaths(logDir, fileSystem)
+    if (logPaths.length == 0) {
+      return false
+    }
+
+    logPaths.foreach { path =>
+      // Keep track of input streams at all levels to close them later
+      // This is necessary because an exception can occur in between stream initializations
+      var fileStream: Option[InputStream] = None
+      var bufferedStream: Option[InputStream] = None
+      var compressStream: Option[InputStream] = None
+      var currentLine = ""
+      try {
+        currentLine = "<not started>"
+        fileStream = Some(fileSystem.open(path))
+        bufferedStream = Some(new FastBufferedInputStream(fileStream.get))
+        compressStream =
+          if (compressed) {
+            Some(compressionCodec.compressedInputStream(bufferedStream.get))
+          } else bufferedStream
+
+        // Parse each line as an event and post it to all attached listeners
+        val lines = Source.fromInputStream(compressStream.get).getLines()
+        lines.foreach { line =>
+          currentLine = line
+          postToAll(JsonProtocol.sparkEventFromJson(parse(line)))
+        }
+      } catch {
+        case e: Exception =>
+          logError("Exception in parsing Spark event log %s".format(path), e)
+          logError("Malformed line: %s\n".format(currentLine))
+      } finally {
+        fileStream.foreach(_.close())
+        bufferedStream.foreach(_.close())
+        compressStream.foreach(_.close())
+      }
+    }
+    fileSystem.close()
+    true
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 9590c03..d4eb0ac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -19,33 +19,52 @@ package org.apache.spark.scheduler
 
 import java.util.Properties
 
+import scala.collection.Map
+import scala.collection.mutable
+
 import org.apache.spark.{Logging, TaskEndReason}
 import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.{Distribution, Utils}
 
-sealed trait SparkListenerEvents
+sealed trait SparkListenerEvent
+
+case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
+  extends SparkListenerEvent
+
+case class SparkListenerStageCompleted(stageInfo: StageInfo) extends SparkListenerEvent
+
+case class SparkListenerTaskStart(stageId: Int, taskInfo: TaskInfo) extends SparkListenerEvent
 
-case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties)
-     extends SparkListenerEvents
+case class SparkListenerTaskGettingResult(taskInfo: TaskInfo) extends SparkListenerEvent
 
-case class SparkListenerStageCompleted(stage: StageInfo) extends SparkListenerEvents
+case class SparkListenerTaskEnd(
+    stageId: Int,
+    taskType: String,
+    reason: TaskEndReason,
+    taskInfo: TaskInfo,
+    taskMetrics: TaskMetrics)
+  extends SparkListenerEvent
 
-case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
+case class SparkListenerJobStart(jobId: Int, stageIds: Seq[Int], properties: Properties = null)
+  extends SparkListenerEvent
 
-case class SparkListenerTaskGettingResult(
-  task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
+case class SparkListenerJobEnd(jobId: Int, jobResult: JobResult) extends SparkListenerEvent
 
-case class SparkListenerTaskEnd(task: Task[_], reason: TaskEndReason, taskInfo: TaskInfo,
-     taskMetrics: TaskMetrics) extends SparkListenerEvents
+case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(String, String)]])
+  extends SparkListenerEvent
 
-case class SparkListenerJobStart(job: ActiveJob, stageIds: Array[Int],
-    properties: Properties = null) extends SparkListenerEvents
+case class SparkListenerBlockManagerAdded(blockManagerId: BlockManagerId, maxMem: Long)
+  extends SparkListenerEvent
 
-case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
-     extends SparkListenerEvents
+case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
+  extends SparkListenerEvent
+
+case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
 
 /** An event used in the listener to shutdown the listener daemon thread. */
-private[scheduler] case object SparkListenerShutdown extends SparkListenerEvents
+private[spark] case object SparkListenerShutdown extends SparkListenerEvent
+
 
 /**
  * Interface for listening to events from the Spark scheduler.
@@ -87,97 +106,134 @@ trait SparkListener {
    */
   def onJobEnd(jobEnd: SparkListenerJobEnd) { }
 
+  /**
+   * Called when environment properties have been updated
+   */
+  def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { }
+
+  /**
+   * Called when a new block manager has joined
+   */
+  def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded) { }
+
+  /**
+   * Called when an existing block manager has been removed
+   */
+  def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) { }
+
+  /**
+   * Called when an RDD is manually unpersisted by the application
+   */
+  def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD) { }
 }
 
 /**
  * Simple SparkListener that logs a few summary statistics when each stage completes
  */
 class StatsReportListener extends SparkListener with Logging {
+
+  import org.apache.spark.scheduler.StatsReportListener._
+
+  private val taskInfoMetrics = mutable.Buffer[(TaskInfo, TaskMetrics)]()
+
+  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
+    val info = taskEnd.taskInfo
+    val metrics = taskEnd.taskMetrics
+    if (info != null && metrics != null) {
+      taskInfoMetrics += ((info, metrics))
+    }
+  }
+
   override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
-    import org.apache.spark.scheduler.StatsReportListener._
     implicit val sc = stageCompleted
-    this.logInfo("Finished stage: " + stageCompleted.stage)
-    showMillisDistribution("task runtime:", (info, _) => Some(info.duration))
+    this.logInfo("Finished stage: " + stageCompleted.stageInfo)
+    showMillisDistribution("task runtime:", (info, _) => Some(info.duration), taskInfoMetrics)
 
-    //shuffle write
+    // Shuffle write
     showBytesDistribution("shuffle bytes written:",
-      (_,metric) => metric.shuffleWriteMetrics.map(_.shuffleBytesWritten))
+      (_, metric) => metric.shuffleWriteMetrics.map(_.shuffleBytesWritten), taskInfoMetrics)
 
-    //fetch & io
+    // Fetch & I/O
     showMillisDistribution("fetch wait time:",
-      (_, metric) => metric.shuffleReadMetrics.map(_.fetchWaitTime))
+      (_, metric) => metric.shuffleReadMetrics.map(_.fetchWaitTime), taskInfoMetrics)
     showBytesDistribution("remote bytes read:",
-      (_, metric) => metric.shuffleReadMetrics.map(_.remoteBytesRead))
-    showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize))
-
-    //runtime breakdown
+      (_, metric) => metric.shuffleReadMetrics.map(_.remoteBytesRead), taskInfoMetrics)
+    showBytesDistribution("task result size:",
+      (_, metric) => Some(metric.resultSize), taskInfoMetrics)
 
-    val runtimePcts = stageCompleted.stage.taskInfos.map{
-      case (info, metrics) => RuntimePercentage(info.duration, metrics)
+    // Runtime breakdown
+    val runtimePcts = taskInfoMetrics.map { case (info, metrics) =>
+      RuntimePercentage(info.duration, metrics)
     }
     showDistribution("executor (non-fetch) time pct: ",
-      Distribution(runtimePcts.map{_.executorPct * 100}), "%2.0f %%")
+      Distribution(runtimePcts.map(_.executorPct * 100)), "%2.0f %%")
     showDistribution("fetch wait time pct: ",
-      Distribution(runtimePcts.flatMap{_.fetchPct.map{_ * 100}}), "%2.0f %%")
-    showDistribution("other time pct: ", Distribution(runtimePcts.map{_.other * 100}), "%2.0f %%")
+      Distribution(runtimePcts.flatMap(_.fetchPct.map(_ * 100))), "%2.0f %%")
+    showDistribution("other time pct: ", Distribution(runtimePcts.map(_.other * 100)), "%2.0f %%")
+    taskInfoMetrics.clear()
   }
 
 }
 
 private[spark] object StatsReportListener extends Logging {
 
-  //for profiling, the extremes are more interesting
+  // For profiling, the extremes are more interesting
   val percentiles = Array[Int](0,5,10,25,50,75,90,95,100)
-  val probabilities = percentiles.map{_ / 100.0}
+  val probabilities = percentiles.map(_ / 100.0)
   val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%"
 
-  def extractDoubleDistribution(stage: SparkListenerStageCompleted,
-      getMetric: (TaskInfo,TaskMetrics) => Option[Double])
-    : Option[Distribution] = {
-    Distribution(stage.stage.taskInfos.flatMap {
-      case ((info,metric)) => getMetric(info, metric)})
+  def extractDoubleDistribution(
+      taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)],
+      getMetric: (TaskInfo, TaskMetrics) => Option[Double]): Option[Distribution] = {
+    Distribution(taskInfoMetrics.flatMap { case (info, metric) => getMetric(info, metric) })
   }
 
-  //is there some way to setup the types that I can get rid of this completely?
-  def extractLongDistribution(stage: SparkListenerStageCompleted,
-      getMetric: (TaskInfo,TaskMetrics) => Option[Long])
-    : Option[Distribution] = {
-    extractDoubleDistribution(stage, (info, metric) => getMetric(info,metric).map{_.toDouble})
+  // Is there some way to setup the types that I can get rid of this completely?
+  def extractLongDistribution(
+      taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)],
+      getMetric: (TaskInfo, TaskMetrics) => Option[Long]): Option[Distribution] = {
+    extractDoubleDistribution(
+      taskInfoMetrics,
+      (info, metric) => { getMetric(info, metric).map(_.toDouble) })
   }
 
   def showDistribution(heading: String, d: Distribution, formatNumber: Double => String) {
     val stats = d.statCounter
-    val quantiles = d.getQuantiles(probabilities).map{formatNumber}
+    val quantiles = d.getQuantiles(probabilities).map(formatNumber)
     logInfo(heading + stats)
     logInfo(percentilesHeader)
     logInfo("\t" + quantiles.mkString("\t"))
   }
 
-  def showDistribution(heading: String, dOpt: Option[Distribution], formatNumber: Double => String)
-  {
+  def showDistribution(
+      heading: String,
+      dOpt: Option[Distribution],
+      formatNumber: Double => String) {
     dOpt.foreach { d => showDistribution(heading, d, formatNumber)}
   }
 
   def showDistribution(heading: String, dOpt: Option[Distribution], format:String) {
-    def f(d:Double) = format.format(d)
+    def f(d: Double) = format.format(d)
     showDistribution(heading, dOpt, f _)
   }
 
   def showDistribution(
       heading: String,
       format: String,
-      getMetric: (TaskInfo, TaskMetrics) => Option[Double])
-      (implicit stage: SparkListenerStageCompleted) {
-    showDistribution(heading, extractDoubleDistribution(stage, getMetric), format)
+      getMetric: (TaskInfo, TaskMetrics) => Option[Double],
+      taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
+    showDistribution(heading, extractDoubleDistribution(taskInfoMetrics, getMetric), format)
   }
 
-  def showBytesDistribution(heading:String, getMetric: (TaskInfo,TaskMetrics) => Option[Long])
-    (implicit stage: SparkListenerStageCompleted) {
-    showBytesDistribution(heading, extractLongDistribution(stage, getMetric))
+  def showBytesDistribution(
+      heading:String,
+      getMetric: (TaskInfo, TaskMetrics) => Option[Long],
+      taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
+    showBytesDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric))
   }
 
   def showBytesDistribution(heading: String, dOpt: Option[Distribution]) {
-    dOpt.foreach{dist => showBytesDistribution(heading, dist)}
+    dOpt.foreach { dist => showBytesDistribution(heading, dist) }
   }
 
   def showBytesDistribution(heading: String, dist: Distribution) {
@@ -189,9 +245,11 @@ private[spark] object StatsReportListener extends Logging {
       (d => StatsReportListener.millisToString(d.toLong)): Double => String)
   }
 
-  def showMillisDistribution(heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long])
-    (implicit stage: SparkListenerStageCompleted) {
-    showMillisDistribution(heading, extractLongDistribution(stage, getMetric))
+  def showMillisDistribution(
+      heading: String,
+      getMetric: (TaskInfo, TaskMetrics) => Option[Long],
+      taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) {
+    showMillisDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric))
   }
 
   val seconds = 1000L
@@ -199,7 +257,7 @@ private[spark] object StatsReportListener extends Logging {
   val hours = minutes * 60
 
   /**
-   * reformat a time interval in milliseconds to a prettier format for output
+   * Reformat a time interval in milliseconds to a prettier format for output
    */
   def millisToString(ms: Long) = {
     val (size, units) =
@@ -221,8 +279,8 @@ private case class RuntimePercentage(executorPct: Double, fetchPct: Option[Doubl
 private object RuntimePercentage {
   def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
     val denom = totalTime.toDouble
-    val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime}
-    val fetch = fetchTime.map{_ / denom}
+    val fetchTime = metrics.shuffleReadMetrics.map(_.fetchWaitTime)
+    val fetch = fetchTime.map(_ / denom)
     val exec = (metrics.executorRunTime - fetchTime.getOrElse(0L)) / denom
     val other = 1.0 - (exec + fetch.getOrElse(0d))
     RuntimePercentage(exec, fetch, other)

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
index 17b1328..729e120 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala
@@ -1,100 +1,67 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler
-
-import java.util.concurrent.LinkedBlockingQueue
-
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
-
-import org.apache.spark.Logging
-
-/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
-private[spark] class SparkListenerBus extends Logging {
-  private val sparkListeners = new ArrayBuffer[SparkListener] with SynchronizedBuffer[SparkListener]
-
-  /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
-   * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
-  private val EVENT_QUEUE_CAPACITY = 10000
-  private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
-  private var queueFullErrorMessageLogged = false
-
-  // Create a new daemon thread to listen for events. This thread is stopped when it receives
-  // a SparkListenerShutdown event, using the stop method.
-  new Thread("SparkListenerBus") {
-    setDaemon(true)
-    override def run() {
-      while (true) {
-        val event = eventQueue.take
-        event match {
-          case stageSubmitted: SparkListenerStageSubmitted =>
-            sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
-          case stageCompleted: SparkListenerStageCompleted =>
-            sparkListeners.foreach(_.onStageCompleted(stageCompleted))
-          case jobStart: SparkListenerJobStart =>
-            sparkListeners.foreach(_.onJobStart(jobStart))
-          case jobEnd: SparkListenerJobEnd =>
-            sparkListeners.foreach(_.onJobEnd(jobEnd))
-          case taskStart: SparkListenerTaskStart =>
-            sparkListeners.foreach(_.onTaskStart(taskStart))
-          case taskGettingResult: SparkListenerTaskGettingResult =>
-            sparkListeners.foreach(_.onTaskGettingResult(taskGettingResult))
-          case taskEnd: SparkListenerTaskEnd =>
-            sparkListeners.foreach(_.onTaskEnd(taskEnd))
-          case SparkListenerShutdown =>
-            // Get out of the while loop and shutdown the daemon thread
-            return
-          case _ =>
-        }
-      }
-    }
-  }.start()
-
-  def addListener(listener: SparkListener) {
-    sparkListeners += listener
-  }
-
-  def post(event: SparkListenerEvents) {
-    val eventAdded = eventQueue.offer(event)
-    if (!eventAdded && !queueFullErrorMessageLogged) {
-      logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
-        "This likely means one of the SparkListeners is too slow and cannot keep up with the " +
-        "rate at which tasks are being started by the scheduler.")
-      queueFullErrorMessageLogged = true
-    }
-  }
-
-  /**
-   * Waits until there are no more events in the queue, or until the specified time has elapsed.
-   * Used for testing only. Returns true if the queue has emptied and false is the specified time
-   * elapsed before the queue emptied.
-   */
-  def waitUntilEmpty(timeoutMillis: Int): Boolean = {
-    val finishTime = System.currentTimeMillis + timeoutMillis
-    while (!eventQueue.isEmpty) {
-      if (System.currentTimeMillis > finishTime) {
-        return false
-      }
-      /* Sleep rather than using wait/notify, because this is used only for testing and wait/notify
-       * add overhead in the general case. */
-      Thread.sleep(10)
-    }
-    true
-  }
-
-  def stop(): Unit = post(SparkListenerShutdown)
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * A SparkListenerEvent bus that relays events to its listeners
+ */
+private[spark] trait SparkListenerBus {
+
+  // SparkListeners attached to this event bus
+  protected val sparkListeners = new ArrayBuffer[SparkListener]
+    with mutable.SynchronizedBuffer[SparkListener]
+
+  def addListener(listener: SparkListener) {
+    sparkListeners += listener
+  }
+
+  /**
+   * Post an event to all attached listeners. This does nothing if the event is
+   * SparkListenerShutdown.
+   */
+  protected def postToAll(event: SparkListenerEvent) {
+    event match {
+      case stageSubmitted: SparkListenerStageSubmitted =>
+        sparkListeners.foreach(_.onStageSubmitted(stageSubmitted))
+      case stageCompleted: SparkListenerStageCompleted =>
+        sparkListeners.foreach(_.onStageCompleted(stageCompleted))
+      case jobStart: SparkListenerJobStart =>
+        sparkListeners.foreach(_.onJobStart(jobStart))
+      case jobEnd: SparkListenerJobEnd =>
+        sparkListeners.foreach(_.onJobEnd(jobEnd))
+      case taskStart: SparkListenerTaskStart =>
+        sparkListeners.foreach(_.onTaskStart(taskStart))
+      case taskGettingResult: SparkListenerTaskGettingResult =>
+        sparkListeners.foreach(_.onTaskGettingResult(taskGettingResult))
+      case taskEnd: SparkListenerTaskEnd =>
+        sparkListeners.foreach(_.onTaskEnd(taskEnd))
+      case environmentUpdate: SparkListenerEnvironmentUpdate =>
+        sparkListeners.foreach(_.onEnvironmentUpdate(environmentUpdate))
+      case blockManagerAdded: SparkListenerBlockManagerAdded =>
+        sparkListeners.foreach(_.onBlockManagerAdded(blockManagerAdded))
+      case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
+        sparkListeners.foreach(_.onBlockManagerRemoved(blockManagerRemoved))
+      case unpersistRDD: SparkListenerUnpersistRDD =>
+        sparkListeners.foreach(_.onUnpersistRDD(unpersistRDD))
+      case SparkListenerShutdown =>
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index 8f320e5..8115a7e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -17,28 +17,25 @@
 
 package org.apache.spark.scheduler
 
-import scala.collection._
-
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.storage.RDDInfo
 
 /**
  * Stores information about a stage to pass from the scheduler to SparkListeners.
- *
- * taskInfos stores the metrics for all tasks that have completed, including redundant, speculated
- * tasks.
  */
-class StageInfo(
-    stage: Stage,
-    val taskInfos: mutable.Buffer[(TaskInfo, TaskMetrics)] =
-    mutable.Buffer[(TaskInfo, TaskMetrics)]()
-) {
-  val stageId = stage.id
+private[spark]
+class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfo: RDDInfo) {
   /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
   var submissionTime: Option[Long] = None
   var completionTime: Option[Long] = None
-  val rddName = stage.rdd.name
-  val name = stage.name
-  val numPartitions = stage.numPartitions
-  val numTasks = stage.numTasks
   var emittedTaskSizeWarning = false
 }
+
+private[spark]
+object StageInfo {
+  def fromStage(stage: Stage): StageInfo = {
+    val rdd = stage.rdd
+    val rddName = Option(rdd.name).getOrElse(rdd.id.toString)
+    val rddInfo = new RDDInfo(rdd.id, rddName, rdd.partitions.size, rdd.getStorageLevel)
+    new StageInfo(stage.id, stage.name, stage.numTasks, rddInfo)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
index ea3229b..308edb1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocality.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.scheduler
 
 private[spark] object TaskLocality extends Enumeration {
-  // process local is expected to be used ONLY within tasksetmanager for now.
+  // Process local is expected to be used ONLY within TaskSetManager for now.
   val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
 
   type TaskLocality = Value

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index abff252..30bceb4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -204,7 +204,7 @@ private[spark] class TaskSchedulerImpl(
       executorIdToHost(o.executorId) = o.host
       if (!executorsByHost.contains(o.host)) {
         executorsByHost(o.host) = new HashSet[String]()
-        executorGained(o.executorId, o.host)
+        executorAdded(o.executorId, o.host)
       }
     }
 
@@ -400,8 +400,8 @@ private[spark] class TaskSchedulerImpl(
     rootPool.executorLost(executorId, host)
   }
 
-  def executorGained(execId: String, host: String) {
-    dagScheduler.executorGained(execId, host)
+  def executorAdded(execId: String, host: String) {
+    dagScheduler.executorAdded(execId, host)
   }
 
   def getExecutorsAliveOnHost(host: String): Option[Set[String]] = synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/79d07d66/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index ee4b65e..25b7472 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.scheduler.cluster
 
 import org.apache.spark.{Logging, SparkContext}
-import org.apache.spark.deploy.{Command, ApplicationDescription}
+import org.apache.spark.deploy.{ApplicationDescription, Command}
 import org.apache.spark.deploy.client.{AppClient, AppClientListener}
 import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
 import org.apache.spark.util.Utils
@@ -26,8 +26,7 @@ import org.apache.spark.util.Utils
 private[spark] class SparkDeploySchedulerBackend(
     scheduler: TaskSchedulerImpl,
     sc: SparkContext,
-    masters: Array[String],
-    appName: String)
+    masters: Array[String])
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
   with AppClientListener
   with Logging {
@@ -49,8 +48,8 @@ private[spark] class SparkDeploySchedulerBackend(
     val command = Command(
       "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
     val sparkHome = sc.getSparkHome()
-    val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command,
-      sparkHome, "http://" + sc.ui.appUIAddress)
+    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
+      sparkHome, sc.ui.appUIAddress, sc.eventLoggingInfo)
 
     client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
     client.start()