You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/06/17 09:08:15 UTC

git commit: SPARK-2035: Store call stack for stages, display it on the UI.

Repository: spark
Updated Branches:
  refs/heads/master 8cd04c3ee -> 23a12ce20


SPARK-2035: Store call stack for stages, display it on the UI.

I'm not sure about the test -- I get a lot of unrelated failures for some reason. I'll try to sort it out. But hopefully the automation will test this for me if I send a pull request :).

I'll attach a demo HTML in [Jira](https://issues.apache.org/jira/browse/SPARK-2035).

Author: Daniel Darabos <da...@gmail.com>
Author: Patrick Wendell <pw...@gmail.com>

Closes #981 from darabos/darabos-call-stack and squashes the following commits:

f7c6bfa [Daniel Darabos] Fix bad merge. I undid 83c226d454 by Doris.
3d0a48d [Daniel Darabos] Merge remote-tracking branch 'upstream/master' into darabos-call-stack
b857849 [Daniel Darabos] Style: Break long line.
ecb5690 [Daniel Darabos] Include the last Spark method in the full stack trace. Otherwise it is not visible if the stage name is overridden.
d00a85b [Patrick Wendell] Make call sites for stages non-optional and well defined
b9eba24 [Daniel Darabos] Make StageInfo.details non-optional. Add JSON serialization code for the new field. Verify JSON backward compatibility.
4312828 [Daniel Darabos] Remove Mima excludes for CallSite. They should be unnecessary now, with SPARK-2070 fixed.
0920750 [Daniel Darabos] Merge remote-tracking branch 'upstream/master' into darabos-call-stack
a4b1faf [Daniel Darabos] Add Mima exclusions for the CallSite changes it has picked up. They are private methods/classes, so we ought to be safe.
932f810 [Daniel Darabos] Use empty CallSite instead of null in DAGSchedulerSuite. Outside of testing, this parameter always originates in SparkContext.scala, and will never be null.
ccd89d1 [Daniel Darabos] Fix long lines.
ac173e4 [Daniel Darabos] Hide "show details" if there are no details to show.
6182da6 [Daniel Darabos] Set a configurable limit on maximum call stack depth. It can be useful in memory-constrained situations with large numbers of stages.
8fe2e34 [Daniel Darabos] Store call stack for stages, display it on the UI.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23a12ce2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23a12ce2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23a12ce2

Branch: refs/heads/master
Commit: 23a12ce20c55653b08b16e6159ab31d2ca88acf1
Parents: 8cd04c3
Author: Daniel Darabos <da...@gmail.com>
Authored: Tue Jun 17 00:08:05 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Tue Jun 17 00:08:05 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/ui/static/webui.css        | 21 +++++++++++++
 .../scala/org/apache/spark/SparkContext.scala   | 18 ++++++-----
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  6 ++--
 .../org/apache/spark/scheduler/ActiveJob.scala  |  3 +-
 .../apache/spark/scheduler/DAGScheduler.scala   | 24 ++++++++-------
 .../spark/scheduler/DAGSchedulerEvent.scala     |  3 +-
 .../org/apache/spark/scheduler/Stage.scala      | 11 +++++--
 .../org/apache/spark/scheduler/StageInfo.scala  |  9 ++++--
 .../org/apache/spark/ui/jobs/StageTable.scala   | 10 +++++-
 .../org/apache/spark/util/JsonProtocol.scala    |  4 ++-
 .../scala/org/apache/spark/util/Utils.scala     | 32 ++++++++++----------
 .../apache/spark/SparkContextInfoSuite.scala    |  2 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     |  7 +++--
 .../ui/jobs/JobProgressListenerSuite.scala      |  4 +--
 .../apache/spark/util/JsonProtocolSuite.scala   | 14 ++++++++-
 15 files changed, 115 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/23a12ce2/core/src/main/resources/org/apache/spark/ui/static/webui.css
----------------------------------------------------------------------
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index 599c3ac..a8bc141 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -87,3 +87,24 @@ span.kill-link {
 span.kill-link a {
   color: gray;
 }
+
+span.expand-details {
+  font-size: 10pt;
+  cursor: pointer;
+  color: grey;
+  float: right;
+}
+
+.stage-details {
+  max-height: 100px;
+  overflow-y: auto;
+  margin: 0;
+  transition: max-height 0.5s ease-out, padding 0.5s ease-out;
+}
+
+.stage-details.collapsed {
+  max-height: 0;
+  padding-top: 0;
+  padding-bottom: 0;
+  border: none;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/23a12ce2/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 35970c2..0678bdd 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -49,7 +49,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
 import org.apache.spark.scheduler.local.LocalBackend
 import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
+import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
 
 /**
  * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -1036,9 +1036,11 @@ class SparkContext(config: SparkConf) extends Logging {
    * Capture the current user callsite and return a formatted version for printing. If the user
    * has overridden the call site, this will return the user's version.
    */
-  private[spark] def getCallSite(): String = {
-    val defaultCallSite = Utils.getCallSiteInfo
-    Option(getLocalProperty("externalCallSite")).getOrElse(defaultCallSite.toString)
+  private[spark] def getCallSite(): CallSite = {
+    Option(getLocalProperty("externalCallSite")) match {
+      case Some(callSite) => CallSite(callSite, long = "")
+      case None => Utils.getCallSite
+    }
   }
 
   /**
@@ -1058,11 +1060,11 @@ class SparkContext(config: SparkConf) extends Logging {
     }
     val callSite = getCallSite
     val cleanedFunc = clean(func)
-    logInfo("Starting job: " + callSite)
+    logInfo("Starting job: " + callSite.short)
     val start = System.nanoTime
     dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
       resultHandler, localProperties.get)
-    logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
+    logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
     rdd.doCheckpoint()
   }
 
@@ -1143,11 +1145,11 @@ class SparkContext(config: SparkConf) extends Logging {
       evaluator: ApproximateEvaluator[U, R],
       timeout: Long): PartialResult[R] = {
     val callSite = getCallSite
-    logInfo("Starting job: " + callSite)
+    logInfo("Starting job: " + callSite.short)
     val start = System.nanoTime
     val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
       localProperties.get)
-    logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
+    logInfo("Job finished: " + callSite.short + ", took " + (System.nanoTime - start) / 1e9 + " s")
     result
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/23a12ce2/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 446f369..27cc60d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -40,7 +40,7 @@ import org.apache.spark.partial.CountEvaluator
 import org.apache.spark.partial.GroupedCountEvaluator
 import org.apache.spark.partial.PartialResult
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{BoundedPriorityQueue, Utils}
+import org.apache.spark.util.{BoundedPriorityQueue, CallSite, Utils}
 import org.apache.spark.util.collection.OpenHashMap
 import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}
 
@@ -1189,8 +1189,8 @@ abstract class RDD[T: ClassTag](
   private var storageLevel: StorageLevel = StorageLevel.NONE
 
   /** User code that created this RDD (e.g. `textFile`, `parallelize`). */
-  @transient private[spark] val creationSiteInfo = Utils.getCallSiteInfo
-  private[spark] def getCreationSite: String = Option(creationSiteInfo).getOrElse("").toString
+  @transient private[spark] val creationSite = Utils.getCallSite
+  private[spark] def getCreationSite: String = Option(creationSite).map(_.short).getOrElse("")
 
   private[spark] def elementClassTag: ClassTag[T] = classTag[T]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/23a12ce2/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
index 9257f48..b755d8f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ActiveJob.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
 import java.util.Properties
 
 import org.apache.spark.TaskContext
+import org.apache.spark.util.CallSite
 
 /**
  * Tracks information about an active job in the DAGScheduler.
@@ -29,7 +30,7 @@ private[spark] class ActiveJob(
     val finalStage: Stage,
     val func: (TaskContext, Iterator[_]) => _,
     val partitions: Array[Int],
-    val callSite: String,
+    val callSite: CallSite,
     val listener: JobListener,
     val properties: Properties) {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/23a12ce2/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 3c85b5a..b3ebaa5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -38,7 +38,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerMaster, RDDBlockId}
-import org.apache.spark.util.{SystemClock, Clock, Utils}
+import org.apache.spark.util.{CallSite, SystemClock, Clock, Utils}
 
 /**
  * The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
@@ -195,7 +195,9 @@ class DAGScheduler(
       case Some(stage) => stage
       case None =>
         val stage =
-          newOrUsedStage(shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId)
+          newOrUsedStage(
+            shuffleDep.rdd, shuffleDep.rdd.partitions.size, shuffleDep, jobId,
+            shuffleDep.rdd.creationSite)
         shuffleToMapStage(shuffleDep.shuffleId) = stage
         stage
     }
@@ -212,7 +214,7 @@ class DAGScheduler(
       numTasks: Int,
       shuffleDep: Option[ShuffleDependency[_, _, _]],
       jobId: Int,
-      callSite: Option[String] = None)
+      callSite: CallSite)
     : Stage =
   {
     val id = nextStageId.getAndIncrement()
@@ -235,7 +237,7 @@ class DAGScheduler(
       numTasks: Int,
       shuffleDep: ShuffleDependency[_, _, _],
       jobId: Int,
-      callSite: Option[String] = None)
+      callSite: CallSite)
     : Stage =
   {
     val stage = newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite)
@@ -413,7 +415,7 @@ class DAGScheduler(
       rdd: RDD[T],
       func: (TaskContext, Iterator[T]) => U,
       partitions: Seq[Int],
-      callSite: String,
+      callSite: CallSite,
       allowLocal: Boolean,
       resultHandler: (Int, U) => Unit,
       properties: Properties = null): JobWaiter[U] =
@@ -443,7 +445,7 @@ class DAGScheduler(
       rdd: RDD[T],
       func: (TaskContext, Iterator[T]) => U,
       partitions: Seq[Int],
-      callSite: String,
+      callSite: CallSite,
       allowLocal: Boolean,
       resultHandler: (Int, U) => Unit,
       properties: Properties = null)
@@ -452,7 +454,7 @@ class DAGScheduler(
     waiter.awaitResult() match {
       case JobSucceeded => {}
       case JobFailed(exception: Exception) =>
-        logInfo("Failed to run " + callSite)
+        logInfo("Failed to run " + callSite.short)
         throw exception
     }
   }
@@ -461,7 +463,7 @@ class DAGScheduler(
       rdd: RDD[T],
       func: (TaskContext, Iterator[T]) => U,
       evaluator: ApproximateEvaluator[U, R],
-      callSite: String,
+      callSite: CallSite,
       timeout: Long,
       properties: Properties = null)
     : PartialResult[R] =
@@ -666,7 +668,7 @@ class DAGScheduler(
       func: (TaskContext, Iterator[_]) => _,
       partitions: Array[Int],
       allowLocal: Boolean,
-      callSite: String,
+      callSite: CallSite,
       listener: JobListener,
       properties: Properties = null)
   {
@@ -674,7 +676,7 @@ class DAGScheduler(
     try {
       // New stage creation may throw an exception if, for example, jobs are run on a
       // HadoopRDD whose underlying HDFS files have been deleted.
-      finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
+      finalStage = newStage(finalRDD, partitions.size, None, jobId, callSite)
     } catch {
       case e: Exception =>
         logWarning("Creating new stage failed due to exception - job: " + jobId, e)
@@ -685,7 +687,7 @@ class DAGScheduler(
       val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
       clearCacheLocs()
       logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(
-        job.jobId, callSite, partitions.length, allowLocal))
+        job.jobId, callSite.short, partitions.length, allowLocal))
       logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")
       logInfo("Parents of final stage: " + finalStage.parents)
       logInfo("Missing parents: " + getMissingParentStages(finalStage))

http://git-wip-us.apache.org/repos/asf/spark/blob/23a12ce2/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 23f5744..2b6f7e4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -25,6 +25,7 @@ import scala.language.existentials
 import org.apache.spark._
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.rdd.RDD
+import org.apache.spark.util.CallSite
 
 /**
  * Types of events that can be handled by the DAGScheduler. The DAGScheduler uses an event queue
@@ -40,7 +41,7 @@ private[scheduler] case class JobSubmitted(
     func: (TaskContext, Iterator[_]) => _,
     partitions: Array[Int],
     allowLocal: Boolean,
-    callSite: String,
+    callSite: CallSite,
     listener: JobListener,
     properties: Properties = null)
   extends DAGSchedulerEvent

http://git-wip-us.apache.org/repos/asf/spark/blob/23a12ce2/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 3bf9713..9a4be43 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.BlockManagerId
+import org.apache.spark.util.CallSite
 
 /**
  * A stage is a set of independent tasks all computing the same function that need to run as part
@@ -35,6 +36,11 @@ import org.apache.spark.storage.BlockManagerId
  * Each Stage also has a jobId, identifying the job that first submitted the stage.  When FIFO
  * scheduling is used, this allows Stages from earlier jobs to be computed first or recovered
  * faster on failure.
+ *
+ * The callSite provides a location in user code which relates to the stage. For a shuffle map
+ * stage, the callSite gives the user code that created the RDD being shuffled. For a result
+ * stage, the callSite gives the user code that executes the associated action (e.g. count()).
+ *
  */
 private[spark] class Stage(
     val id: Int,
@@ -43,7 +49,7 @@ private[spark] class Stage(
     val shuffleDep: Option[ShuffleDependency[_, _, _]],  // Output shuffle if stage is a map stage
     val parents: List[Stage],
     val jobId: Int,
-    callSite: Option[String])
+    val callSite: CallSite)
   extends Logging {
 
   val isShuffleMap = shuffleDep.isDefined
@@ -100,7 +106,8 @@ private[spark] class Stage(
     id
   }
 
-  val name = callSite.getOrElse(rdd.getCreationSite)
+  val name = callSite.short
+  val details = callSite.long
 
   override def toString = "Stage " + id
 

http://git-wip-us.apache.org/repos/asf/spark/blob/23a12ce2/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
index b42e231..7644e3f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala
@@ -25,7 +25,12 @@ import org.apache.spark.storage.RDDInfo
  * Stores information about a stage to pass from the scheduler to SparkListeners.
  */
 @DeveloperApi
-class StageInfo(val stageId: Int, val name: String, val numTasks: Int, val rddInfos: Seq[RDDInfo]) {
+class StageInfo(
+    val stageId: Int,
+    val name: String,
+    val numTasks: Int,
+    val rddInfos: Seq[RDDInfo],
+    val details: String) {
   /** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
   var submissionTime: Option[Long] = None
   /** Time when all tasks in the stage completed or when the stage was cancelled. */
@@ -52,6 +57,6 @@ private[spark] object StageInfo {
   def fromStage(stage: Stage): StageInfo = {
     val ancestorRddInfos = stage.rdd.getNarrowAncestors.map(RDDInfo.fromRdd)
     val rddInfos = Seq(RDDInfo.fromRdd(stage.rdd)) ++ ancestorRddInfos
-    new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos)
+    new StageInfo(stage.id, stage.name, stage.numTasks, rddInfos, stage.details)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/23a12ce2/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 153434a..a3f824a 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -91,9 +91,17 @@ private[ui] class StageTableBase(
         {s.name}
       </a>
 
+    val details = if (s.details.nonEmpty) (
+      <span onclick="this.parentNode.querySelector('.stage-details').classList.toggle('collapsed')"
+            class="expand-details">
+        +show details
+      </span>
+      <pre class="stage-details collapsed">{s.details}</pre>
+    )
+
     listener.stageIdToDescription.get(s.stageId)
       .map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)
-      .getOrElse(<div> {killLink}{nameLink}</div>)
+      .getOrElse(<div>{killLink} {nameLink} {details}</div>)
   }
 
   protected def stageRow(s: StageInfo): Seq[Node] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/23a12ce2/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 0982508..7cecbfe 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -184,6 +184,7 @@ private[spark] object JsonProtocol {
     ("Stage Name" -> stageInfo.name) ~
     ("Number of Tasks" -> stageInfo.numTasks) ~
     ("RDD Info" -> rddInfo) ~
+    ("Details" -> stageInfo.details) ~
     ("Submission Time" -> submissionTime) ~
     ("Completion Time" -> completionTime) ~
     ("Failure Reason" -> failureReason) ~
@@ -469,12 +470,13 @@ private[spark] object JsonProtocol {
     val stageName = (json \ "Stage Name").extract[String]
     val numTasks = (json \ "Number of Tasks").extract[Int]
     val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
+    val details = (json \ "Details").extractOpt[String].getOrElse("")
     val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
     val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
     val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
     val emittedTaskSizeWarning = (json \ "Emitted Task Size Warning").extract[Boolean]
 
-    val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos)
+    val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos, details)
     stageInfo.submissionTime = submissionTime
     stageInfo.completionTime = completionTime
     stageInfo.failureReason = failureReason

http://git-wip-us.apache.org/repos/asf/spark/blob/23a12ce2/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 4ce28bb..a2454e1 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -43,6 +43,9 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.ExecutorUncaughtExceptionHandler
 import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
 
+/** CallSite represents a place in user code. It can have a short and a long form. */
+private[spark] case class CallSite(val short: String, val long: String)
+
 /**
  * Various utility methods used by Spark.
  */
@@ -799,21 +802,12 @@ private[spark] object Utils extends Logging {
    */
   private val SPARK_CLASS_REGEX = """^org\.apache\.spark(\.api\.java)?(\.util)?(\.rdd)?\.[A-Z]""".r
 
-  private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
-                                    val firstUserLine: Int, val firstUserClass: String) {
-
-    /** Returns a printable version of the call site info suitable for logs. */
-    override def toString = {
-      "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine)
-    }
-  }
-
   /**
    * When called inside a class in the spark package, returns the name of the user code class
    * (outside the spark package) that called into Spark, as well as which Spark method they called.
    * This is used, for example, to tell users where in their code each RDD got created.
    */
-  def getCallSiteInfo: CallSiteInfo = {
+  def getCallSite: CallSite = {
     val trace = Thread.currentThread.getStackTrace()
       .filterNot(_.getMethodName.contains("getStackTrace"))
 
@@ -824,11 +818,11 @@ private[spark] object Utils extends Logging {
     var lastSparkMethod = "<unknown>"
     var firstUserFile = "<unknown>"
     var firstUserLine = 0
-    var finished = false
-    var firstUserClass = "<unknown>"
+    var insideSpark = true
+    var callStack = new ArrayBuffer[String]() :+ "<unknown>"
 
     for (el <- trace) {
-      if (!finished) {
+      if (insideSpark) {
         if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName).isDefined) {
           lastSparkMethod = if (el.getMethodName == "<init>") {
             // Spark method is a constructor; get its class name
@@ -836,15 +830,21 @@ private[spark] object Utils extends Logging {
           } else {
             el.getMethodName
           }
+          callStack(0) = el.toString // Put last Spark method on top of the stack trace.
         } else {
           firstUserLine = el.getLineNumber
           firstUserFile = el.getFileName
-          firstUserClass = el.getClassName
-          finished = true
+          callStack += el.toString
+          insideSpark = false
         }
+      } else {
+        callStack += el.toString
       }
     }
-    new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
+    val callStackDepth = System.getProperty("spark.callstack.depth", "20").toInt
+    CallSite(
+      short = "%s at %s:%s".format(lastSparkMethod, firstUserFile, firstUserLine),
+      long = callStack.take(callStackDepth).mkString("\n"))
   }
 
   /** Return a string containing part of a file from byte 'start' to 'end'. */

http://git-wip-us.apache.org/repos/asf/spark/blob/23a12ce2/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
index cd3887d..1fde4ba 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextInfoSuite.scala
@@ -70,7 +70,7 @@ package object testPackage extends Assertions {
   def runCallSiteTest(sc: SparkContext) {
     val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2)
     val rddCreationSite = rdd.getCreationSite
-    val curCallSite = sc.getCallSite() // note: 2 lines after definition of "rdd"
+    val curCallSite = sc.getCallSite().short // note: 2 lines after definition of "rdd"
 
     val rddCreationLine = rddCreationSite match {
       case CALL_SITE_REGEX(func, file, line) => {

http://git-wip-us.apache.org/repos/asf/spark/blob/23a12ce2/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 7506d56..4536832 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
+import org.apache.spark.util.CallSite
 
 class BuggyDAGEventProcessActor extends Actor {
   val state = 0
@@ -211,7 +212,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
       allowLocal: Boolean = false,
       listener: JobListener = jobListener): Int = {
     val jobId = scheduler.nextJobId.getAndIncrement()
-    runEvent(JobSubmitted(jobId, rdd, func, partitions, allowLocal, null, listener))
+    runEvent(JobSubmitted(jobId, rdd, func, partitions, allowLocal, CallSite("", ""), listener))
     jobId
   }
 
@@ -251,7 +252,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
       override def toString = "DAGSchedulerSuite Local RDD"
     }
     val jobId = scheduler.nextJobId.getAndIncrement()
-    runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener))
+    runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
     assert(results === Map(0 -> 42))
     assertDataStructuresEmpty
   }
@@ -265,7 +266,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
       override def toString = "DAGSchedulerSuite Local RDD"
     }
     val jobId = scheduler.nextJobId.getAndIncrement()
-    runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, null, jobListener))
+    runEvent(JobSubmitted(jobId, rdd, jobComputeFunc, Array(0), true, CallSite("", ""), jobListener))
     assert(results.size == 0)
     assertDataStructuresEmpty
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/23a12ce2/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 91b4c7b..c3a14f4 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -32,12 +32,12 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
     val listener = new JobProgressListener(conf)
 
     def createStageStartEvent(stageId: Int) = {
-      val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
+      val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "")
       SparkListenerStageSubmitted(stageInfo)
     }
 
     def createStageEndEvent(stageId: Int) = {
-      val stageInfo = new StageInfo(stageId, stageId.toString, 0, null)
+      val stageInfo = new StageInfo(stageId, stageId.toString, 0, null, "")
       SparkListenerStageCompleted(stageInfo)
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/23a12ce2/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 3031015..f72389b 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -117,6 +117,17 @@ class JsonProtocolSuite extends FunSuite {
     testBlockId(StreamBlockId(1, 2L))
   }
 
+  test("Backward compatibility") {
+    // StageInfo.details was added after 1.0.0.
+    val info = makeStageInfo(1, 2, 3, 4L, 5L)
+    assert(info.details.nonEmpty)
+    val newJson = JsonProtocol.stageInfoToJson(info)
+    val oldJson = newJson.removeField { case (field, _) => field == "Details" }
+    val newInfo = JsonProtocol.stageInfoFromJson(oldJson)
+    assert(info.name === newInfo.name)
+    assert("" === newInfo.details)
+  }
+
 
   /** -------------------------- *
    | Helper test running methods |
@@ -235,6 +246,7 @@ class JsonProtocolSuite extends FunSuite {
     (0 until info1.rddInfos.size).foreach { i =>
       assertEquals(info1.rddInfos(i), info2.rddInfos(i))
     }
+    assert(info1.details === info2.details)
   }
 
   private def assertEquals(info1: RDDInfo, info2: RDDInfo) {
@@ -438,7 +450,7 @@ class JsonProtocolSuite extends FunSuite {
 
   private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
     val rddInfos = (1 to a % 5).map { i => makeRddInfo(a % i, b % i, c % i, d % i, e % i) }
-    new StageInfo(a, "greetings", b, rddInfos)
+    new StageInfo(a, "greetings", b, rddInfos, "details")
   }
 
   private def makeTaskInfo(a: Long, b: Int, c: Long) = {