You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/11/24 22:18:31 UTC

[1/2] spark git commit: [SPARK-4145] Web UI job pages

Repository: spark
Updated Branches:
  refs/heads/master dd1c9cb36 -> 4a90276ab


http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 50f4205..0bc9492 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -47,7 +47,12 @@ class JsonProtocolSuite extends FunSuite {
     val taskEndWithOutput = SparkListenerTaskEnd(1, 0, "ResultTask", Success,
       makeTaskInfo(123L, 234, 67, 345L, false),
       makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true))
-    val jobStart = SparkListenerJobStart(10, Seq[Int](1, 2, 3, 4), properties)
+    val jobStart = {
+      val stageIds = Seq[Int](1, 2, 3, 4)
+      val stageInfos = stageIds.map(x =>
+        makeStageInfo(x, x * 200, x * 300, x * 400L, x * 500L))
+      SparkListenerJobStart(10, stageInfos, properties)
+    }
     val jobEnd = SparkListenerJobEnd(20, JobSucceeded)
     val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
       "JVM Information" -> Seq(("GC speed", "9999 objects/s"), ("Java home", "Land of coffee")),
@@ -224,6 +229,19 @@ class JsonProtocolSuite extends FunSuite {
     assert(expectedExecutorLostFailure === JsonProtocol.taskEndReasonFromJson(oldEvent))
   }
 
+  test("SparkListenerJobStart backward compatibility") {
+    // Prior to Spark 1.2.0, SparkListenerJobStart did not have a "Stage Infos" property.
+    val stageIds = Seq[Int](1, 2, 3, 4)
+    val stageInfos = stageIds.map(x => makeStageInfo(x, x * 200, x * 300, x * 400, x * 500))
+    val dummyStageInfos =
+      stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
+    val jobStart = SparkListenerJobStart(10, stageInfos, properties)
+    val oldEvent = JsonProtocol.jobStartToJson(jobStart).removeField({_._1 == "Stage Infos"})
+    val expectedJobStart =
+      SparkListenerJobStart(10, dummyStageInfos, properties)
+    assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldEvent))
+  }
+
   /** -------------------------- *
    | Helper test running methods |
    * --------------------------- */
@@ -306,7 +324,7 @@ class JsonProtocolSuite extends FunSuite {
       case (e1: SparkListenerJobStart, e2: SparkListenerJobStart) =>
         assert(e1.jobId === e2.jobId)
         assert(e1.properties === e2.properties)
-        assertSeqEquals(e1.stageIds, e2.stageIds, (i1: Int, i2: Int) => assert(i1 === i2))
+        assert(e1.stageIds === e2.stageIds)
       case (e1: SparkListenerJobEnd, e2: SparkListenerJobEnd) =>
         assert(e1.jobId === e2.jobId)
         assertEquals(e1.jobResult, e2.jobResult)
@@ -1051,6 +1069,260 @@ class JsonProtocolSuite extends FunSuite {
       |{
       |  "Event": "SparkListenerJobStart",
       |  "Job ID": 10,
+      |  "Stage Infos": [
+      |    {
+      |      "Stage ID": 1,
+      |      "Stage Attempt ID": 0,
+      |      "Stage Name": "greetings",
+      |      "Number of Tasks": 200,
+      |      "RDD Info": [
+      |        {
+      |          "RDD ID": 1,
+      |          "Name": "mayor",
+      |          "Storage Level": {
+      |            "Use Disk": true,
+      |            "Use Memory": true,
+      |            "Use Tachyon": false,
+      |            "Deserialized": true,
+      |            "Replication": 1
+      |          },
+      |          "Number of Partitions": 200,
+      |          "Number of Cached Partitions": 300,
+      |          "Memory Size": 400,
+      |          "Tachyon Size": 0,
+      |          "Disk Size": 500
+      |        }
+      |      ],
+      |      "Details": "details",
+      |      "Accumulables": [
+      |        {
+      |          "ID": 2,
+      |          "Name": " Accumulable 2",
+      |          "Update": "delta2",
+      |          "Value": "val2"
+      |        },
+      |        {
+      |          "ID": 1,
+      |          "Name": " Accumulable 1",
+      |          "Update": "delta1",
+      |          "Value": "val1"
+      |        }
+      |      ]
+      |    },
+      |    {
+      |      "Stage ID": 2,
+      |      "Stage Attempt ID": 0,
+      |      "Stage Name": "greetings",
+      |      "Number of Tasks": 400,
+      |      "RDD Info": [
+      |        {
+      |          "RDD ID": 2,
+      |          "Name": "mayor",
+      |          "Storage Level": {
+      |            "Use Disk": true,
+      |            "Use Memory": true,
+      |            "Use Tachyon": false,
+      |            "Deserialized": true,
+      |            "Replication": 1
+      |          },
+      |          "Number of Partitions": 400,
+      |          "Number of Cached Partitions": 600,
+      |          "Memory Size": 800,
+      |          "Tachyon Size": 0,
+      |          "Disk Size": 1000
+      |        },
+      |        {
+      |          "RDD ID": 3,
+      |          "Name": "mayor",
+      |          "Storage Level": {
+      |            "Use Disk": true,
+      |            "Use Memory": true,
+      |            "Use Tachyon": false,
+      |            "Deserialized": true,
+      |            "Replication": 1
+      |          },
+      |          "Number of Partitions": 401,
+      |          "Number of Cached Partitions": 601,
+      |          "Memory Size": 801,
+      |          "Tachyon Size": 0,
+      |          "Disk Size": 1001
+      |        }
+      |      ],
+      |      "Details": "details",
+      |      "Accumulables": [
+      |        {
+      |          "ID": 2,
+      |          "Name": " Accumulable 2",
+      |          "Update": "delta2",
+      |          "Value": "val2"
+      |        },
+      |        {
+      |          "ID": 1,
+      |          "Name": " Accumulable 1",
+      |          "Update": "delta1",
+      |          "Value": "val1"
+      |        }
+      |      ]
+      |    },
+      |    {
+      |      "Stage ID": 3,
+      |      "Stage Attempt ID": 0,
+      |      "Stage Name": "greetings",
+      |      "Number of Tasks": 600,
+      |      "RDD Info": [
+      |        {
+      |          "RDD ID": 3,
+      |          "Name": "mayor",
+      |          "Storage Level": {
+      |            "Use Disk": true,
+      |            "Use Memory": true,
+      |            "Use Tachyon": false,
+      |            "Deserialized": true,
+      |            "Replication": 1
+      |          },
+      |          "Number of Partitions": 600,
+      |          "Number of Cached Partitions": 900,
+      |          "Memory Size": 1200,
+      |          "Tachyon Size": 0,
+      |          "Disk Size": 1500
+      |        },
+      |        {
+      |          "RDD ID": 4,
+      |          "Name": "mayor",
+      |          "Storage Level": {
+      |            "Use Disk": true,
+      |            "Use Memory": true,
+      |            "Use Tachyon": false,
+      |            "Deserialized": true,
+      |            "Replication": 1
+      |          },
+      |          "Number of Partitions": 601,
+      |          "Number of Cached Partitions": 901,
+      |          "Memory Size": 1201,
+      |          "Tachyon Size": 0,
+      |          "Disk Size": 1501
+      |        },
+      |        {
+      |          "RDD ID": 5,
+      |          "Name": "mayor",
+      |          "Storage Level": {
+      |            "Use Disk": true,
+      |            "Use Memory": true,
+      |            "Use Tachyon": false,
+      |            "Deserialized": true,
+      |            "Replication": 1
+      |          },
+      |          "Number of Partitions": 602,
+      |          "Number of Cached Partitions": 902,
+      |          "Memory Size": 1202,
+      |          "Tachyon Size": 0,
+      |          "Disk Size": 1502
+      |        }
+      |      ],
+      |      "Details": "details",
+      |      "Accumulables": [
+      |        {
+      |          "ID": 2,
+      |          "Name": " Accumulable 2",
+      |          "Update": "delta2",
+      |          "Value": "val2"
+      |        },
+      |        {
+      |          "ID": 1,
+      |          "Name": " Accumulable 1",
+      |          "Update": "delta1",
+      |          "Value": "val1"
+      |        }
+      |      ]
+      |    },
+      |    {
+      |      "Stage ID": 4,
+      |      "Stage Attempt ID": 0,
+      |      "Stage Name": "greetings",
+      |      "Number of Tasks": 800,
+      |      "RDD Info": [
+      |        {
+      |          "RDD ID": 4,
+      |          "Name": "mayor",
+      |          "Storage Level": {
+      |            "Use Disk": true,
+      |            "Use Memory": true,
+      |            "Use Tachyon": false,
+      |            "Deserialized": true,
+      |            "Replication": 1
+      |          },
+      |          "Number of Partitions": 800,
+      |          "Number of Cached Partitions": 1200,
+      |          "Memory Size": 1600,
+      |          "Tachyon Size": 0,
+      |          "Disk Size": 2000
+      |        },
+      |        {
+      |          "RDD ID": 5,
+      |          "Name": "mayor",
+      |          "Storage Level": {
+      |            "Use Disk": true,
+      |            "Use Memory": true,
+      |            "Use Tachyon": false,
+      |            "Deserialized": true,
+      |            "Replication": 1
+      |          },
+      |          "Number of Partitions": 801,
+      |          "Number of Cached Partitions": 1201,
+      |          "Memory Size": 1601,
+      |          "Tachyon Size": 0,
+      |          "Disk Size": 2001
+      |        },
+      |        {
+      |          "RDD ID": 6,
+      |          "Name": "mayor",
+      |          "Storage Level": {
+      |            "Use Disk": true,
+      |            "Use Memory": true,
+      |            "Use Tachyon": false,
+      |            "Deserialized": true,
+      |            "Replication": 1
+      |          },
+      |          "Number of Partitions": 802,
+      |          "Number of Cached Partitions": 1202,
+      |          "Memory Size": 1602,
+      |          "Tachyon Size": 0,
+      |          "Disk Size": 2002
+      |        },
+      |        {
+      |          "RDD ID": 7,
+      |          "Name": "mayor",
+      |          "Storage Level": {
+      |            "Use Disk": true,
+      |            "Use Memory": true,
+      |            "Use Tachyon": false,
+      |            "Deserialized": true,
+      |            "Replication": 1
+      |          },
+      |          "Number of Partitions": 803,
+      |          "Number of Cached Partitions": 1203,
+      |          "Memory Size": 1603,
+      |          "Tachyon Size": 0,
+      |          "Disk Size": 2003
+      |        }
+      |      ],
+      |      "Details": "details",
+      |      "Accumulables": [
+      |        {
+      |          "ID": 2,
+      |          "Name": " Accumulable 2",
+      |          "Update": "delta2",
+      |          "Value": "val2"
+      |        },
+      |        {
+      |          "ID": 1,
+      |          "Name": " Accumulable 1",
+      |          "Update": "delta1",
+      |          "Value": "val1"
+      |        }
+      |      ]
+      |    }
+      |  ],
       |  "Stage IDs": [
       |    1,
       |    2,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-4145] Web UI job pages

Posted by pw...@apache.org.
[SPARK-4145] Web UI job pages

This PR adds two new pages to the Spark Web UI:

- A jobs overview page, which shows details on running / completed / failed jobs.
- A job details page, which displays information on an individual job's stages.

The jobs overview page is now the default UI homepage; the old homepage is still accessible at `/stages`.

### Screenshots

#### New UI homepage

![image](https://cloud.githubusercontent.com/assets/50748/5119035/fd0a69e6-701f-11e4-89cb-db7e9705714f.png)

#### Job details page

(This is effectively a per-job version of the stages page that can be extended later with other things, such as DAG visualizations)

![image](https://cloud.githubusercontent.com/assets/50748/5134910/50b340d4-70c7-11e4-88e1-6b73237ea7c8.png)

### Key changes in this PR

- Rename `JobProgressPage` to `AllStagesPage`
- Expose `StageInfo` objects in the ``SparkListenerJobStart` event; add backwards-compatibility tests to JsonProtocol.
- Add additional data structures to `JobProgressListener` to map from stages to jobs.
- Add several fields to `JobUIData`.

I also added ~150 lines of Selenium tests as I uncovered UI issues while developing this patch.

### Limitations

If a job contains stages that aren't run, then its overall job progress bar may be an underestimate of the total job progress; in other words, a completed job may appear to have a progress bar that's not at 100%.

If stages or tasks fail, then the progress bar will not go backwards to reflect the true amount of remaining work.

Author: Josh Rosen <jo...@databricks.com>

Closes #3009 from JoshRosen/job-page and squashes the following commits:

eb05e90 [Josh Rosen] Disable kill button in completed stages tables.
f00c851 [Josh Rosen] Fix JsonProtocol compatibility
b89c258 [Josh Rosen] More JSON protocol backwards-compatibility fixes.
ff804cd [Josh Rosen] Don't write "Stage Ids" field in JobStartEvent JSON.
6f17f3f [Josh Rosen] Only store StageInfos in SparkListenerJobStart event.
2bbf41a [Josh Rosen] Update job progress bar to reflect skipped tasks/stages.
61c265a [Josh Rosen] Add “skipped stages” table; only display non-empty tables.
1f45d44 [Josh Rosen] Incorporate a bunch of minor review feedback.
0b77e3e [Josh Rosen] More bug fixes for phantom stages.
034aa8d [Josh Rosen] Use `.max()` to find result stage for job.
eebdc2c [Josh Rosen] Don’t display pending stages for completed jobs.
67080ba [Josh Rosen] Ensure that "phantom stages" don't cause memory leaks.
7d10b97 [Josh Rosen] Merge remote-tracking branch 'apache/master' into job-page
d69c775 [Josh Rosen] Fix table sorting on all jobs page.
5eb39dc [Josh Rosen] Add pending stages table to job page.
f2a15da [Josh Rosen] Add status field to job details page.
171b53c [Josh Rosen] Move `startTime` to the start of SparkContext.
e2f2c43 [Josh Rosen] Fix sorting of stages in job details page.
8955f4c [Josh Rosen] Display information for pending stages on jobs page.
8ab6c28 [Josh Rosen] Compute numTasks from job start stage infos.
5884f91 [Josh Rosen] Add StageInfos to SparkListenerJobStart event.
79793cd [Josh Rosen] Track indices of completed stage to avoid overcounting when failures occur.
d62ea7b [Josh Rosen] Add failing Selenium test for stage overcounting issue.
1145c60 [Josh Rosen] Display text instead of progress bar for stages.
3d0a007 [Josh Rosen] Merge remote-tracking branch 'origin/master' into job-page
8a2351b [Josh Rosen] Add help tooltip to Spark Jobs page.
b7bf30e [Josh Rosen] Add stages progress bar; fix bug where active stages show as completed.
4846ce4 [Josh Rosen] Hide "(Job Group") if no jobs were submitted in job groups.
4d58e55 [Josh Rosen] Change label to "Tasks (for all stages)"
85e9c85 [Josh Rosen] Extract startTime into separate variable.
1cf4987 [Josh Rosen] Fix broken kill links; add Selenium test to avoid future regressions.
56701fa [Josh Rosen] Move last stage name / description logic out of markup.
a475ea1 [Josh Rosen] Add progress bars to jobs page.
45343b8 [Josh Rosen] More comments
4b206fb [Josh Rosen] Merge remote-tracking branch 'origin/master' into job-page
bfce2b9 [Josh Rosen] Address review comments, except for progress bar.
4487dcb [Josh Rosen] [SPARK-4145] Web UI job pages
2568a6c [Josh Rosen] Rename JobProgressPage to AllStagesPage:


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a90276a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a90276a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a90276a

Branch: refs/heads/master
Commit: 4a90276ab22d6989dffb2ee2d8118d9253365646
Parents: dd1c9cb
Author: Josh Rosen <jo...@databricks.com>
Authored: Mon Nov 24 13:18:14 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Nov 24 13:18:14 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |   4 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |   7 +-
 .../apache/spark/scheduler/SparkListener.scala  |  11 +-
 .../scala/org/apache/spark/ui/SparkUI.scala     |  13 +-
 .../scala/org/apache/spark/ui/UIUtils.scala     |  27 +-
 .../org/apache/spark/ui/jobs/AllJobsPage.scala  | 151 ++++++++++
 .../apache/spark/ui/jobs/AllStagesPage.scala    | 102 +++++++
 .../apache/spark/ui/jobs/ExecutorTable.scala    |   2 +-
 .../org/apache/spark/ui/jobs/JobPage.scala      | 177 ++++++++++++
 .../spark/ui/jobs/JobProgressListener.scala     |  99 ++++++-
 .../apache/spark/ui/jobs/JobProgressPage.scala  |  99 -------
 .../apache/spark/ui/jobs/JobProgressTab.scala   |  53 ----
 .../org/apache/spark/ui/jobs/JobsTab.scala      |  32 +++
 .../org/apache/spark/ui/jobs/PoolPage.scala     |   7 +-
 .../org/apache/spark/ui/jobs/PoolTable.scala    |   2 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    |   2 +-
 .../org/apache/spark/ui/jobs/StageTable.scala   |  43 +--
 .../org/apache/spark/ui/jobs/StagesTab.scala    |  51 ++++
 .../scala/org/apache/spark/ui/jobs/UIData.scala |  21 +-
 .../org/apache/spark/util/JsonProtocol.scala    |  23 +-
 .../org/apache/spark/ui/UISeleniumSuite.scala   | 201 +++++++++++++-
 .../ui/jobs/JobProgressListenerSuite.scala      |   8 +-
 .../apache/spark/util/JsonProtocolSuite.scala   | 276 ++++++++++++++++++-
 23 files changed, 1195 insertions(+), 216 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 586c1cc..9b0d5be 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -83,6 +83,8 @@ class SparkContext(config: SparkConf) extends Logging {
   // contains a map from hostname to a list of input format splits on the host.
   private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()
 
+  val startTime = System.currentTimeMillis()
+
   /**
    * Create a SparkContext that loads settings from system properties (for instance, when
    * launching with ./bin/spark-submit).
@@ -269,8 +271,6 @@ class SparkContext(config: SparkConf) extends Logging {
   /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
   val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
 
-  val startTime = System.currentTimeMillis()
-
   // Add each JAR given through the constructor
   if (jars != null) {
     jars.foreach(addJar)

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 2244951..b1222af 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -751,14 +751,15 @@ class DAGScheduler(
         localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1
       if (shouldRunLocally) {
         // Compute very short actions like first() or take() with no parent stages locally.
-        listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
+        listenerBus.post(SparkListenerJobStart(job.jobId, Seq.empty, properties))
         runLocally(job)
       } else {
         jobIdToActiveJob(jobId) = job
         activeJobs += job
         finalStage.resultOfJob = Some(job)
-        listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray,
-          properties))
+        val stageIds = jobIdToStageIds(jobId).toArray
+        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
+        listenerBus.post(SparkListenerJobStart(job.jobId, stageInfos, properties))
         submitStage(finalStage)
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 86afe3b..b62b0c1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -56,8 +56,15 @@ case class SparkListenerTaskEnd(
   extends SparkListenerEvent
 
 @DeveloperApi
-case class SparkListenerJobStart(jobId: Int, stageIds: Seq[Int], properties: Properties = null)
-  extends SparkListenerEvent
+case class SparkListenerJobStart(
+    jobId: Int,
+    stageInfos: Seq[StageInfo],
+    properties: Properties = null)
+  extends SparkListenerEvent {
+  // Note: this is here for backwards-compatibility with older versions of this event which
+  // only stored stageIds and not StageInfos:
+  val stageIds: Seq[Int] = stageInfos.map(_.stageId)
+}
 
 @DeveloperApi
 case class SparkListenerJobEnd(jobId: Int, jobResult: JobResult) extends SparkListenerEvent

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 049938f..176907d 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -23,7 +23,7 @@ import org.apache.spark.storage.StorageStatusListener
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab}
 import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab}
-import org.apache.spark.ui.jobs.{JobProgressListener, JobProgressTab}
+import org.apache.spark.ui.jobs.{JobsTab, JobProgressListener, StagesTab}
 import org.apache.spark.ui.storage.{StorageListener, StorageTab}
 
 /**
@@ -43,17 +43,20 @@ private[spark] class SparkUI private (
   extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
   with Logging {
 
+  val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)
+
   /** Initialize all components of the server. */
   def initialize() {
-    val jobProgressTab = new JobProgressTab(this)
-    attachTab(jobProgressTab)
+    attachTab(new JobsTab(this))
+    val stagesTab = new StagesTab(this)
+    attachTab(stagesTab)
     attachTab(new StorageTab(this))
     attachTab(new EnvironmentTab(this))
     attachTab(new ExecutorsTab(this))
     attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"))
-    attachHandler(createRedirectHandler("/", "/stages", basePath = basePath))
+    attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
     attachHandler(
-      createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest))
+      createRedirectHandler("/stages/stage/kill", "/stages", stagesTab.handleKillRequest))
     // If the UI is live, then serve
     sc.foreach { _.env.metricsSystem.getServletHandlers.foreach(attachHandler) }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 7bc1e24..0c418be 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -169,7 +169,8 @@ private[spark] object UIUtils extends Logging {
       title: String,
       content: => Seq[Node],
       activeTab: SparkUITab,
-      refreshInterval: Option[Int] = None): Seq[Node] = {
+      refreshInterval: Option[Int] = None,
+      helpText: Option[String] = None): Seq[Node] = {
 
     val appName = activeTab.appName
     val shortAppName = if (appName.length < 36) appName else appName.take(32) + "..."
@@ -178,6 +179,9 @@ private[spark] object UIUtils extends Logging {
         <a href={prependBaseUri(activeTab.basePath, "/" + tab.prefix + "/")}>{tab.name}</a>
       </li>
     }
+    val helpButton: Seq[Node] = helpText.map { helpText =>
+      <a data-toggle="tooltip" data-placement="bottom" title={helpText}>(?)</a>
+    }.getOrElse(Seq.empty)
 
     <html>
       <head>
@@ -201,6 +205,7 @@ private[spark] object UIUtils extends Logging {
             <div class="span12">
               <h3 style="vertical-align: bottom; display: inline-block;">
                 {title}
+                {helpButton}
               </h3>
             </div>
           </div>
@@ -283,4 +288,24 @@ private[spark] object UIUtils extends Logging {
       </tbody>
     </table>
   }
+
+  def makeProgressBar(
+      started: Int,
+      completed: Int,
+      failed: Int,
+      skipped:Int,
+      total: Int): Seq[Node] = {
+    val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
+    val startWidth = "width: %s%%".format((started.toDouble/total)*100)
+
+    <div class="progress">
+      <span style="text-align:center; position:absolute; width:100%; left:0;">
+        {completed}/{total}
+        { if (failed > 0) s"($failed failed)" }
+        { if (skipped > 0) s"($skipped skipped)" }
+      </span>
+      <div class="bar bar-completed" style={completeWidth}></div>
+      <div class="bar bar-running" style={startWidth}></div>
+    </div>
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
new file mode 100644
index 0000000..ea2d187
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import scala.xml.{Node, NodeSeq}
+
+import javax.servlet.http.HttpServletRequest
+
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.ui.{WebUIPage, UIUtils}
+import org.apache.spark.ui.jobs.UIData.JobUIData
+
+/** Page showing list of all ongoing and recently finished jobs */
+private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
+  private val startTime: Option[Long] = parent.sc.map(_.startTime)
+  private val listener = parent.listener
+
+  private def jobsTable(jobs: Seq[JobUIData]): Seq[Node] = {
+    val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)
+
+    val columns: Seq[Node] = {
+      <th>{if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"}</th>
+      <th>Description</th>
+      <th>Submitted</th>
+      <th>Duration</th>
+      <th class="sorttable_nosort">Stages: Succeeded/Total</th>
+      <th class="sorttable_nosort">Tasks (for all stages): Succeeded/Total</th>
+    }
+
+    def makeRow(job: JobUIData): Seq[Node] = {
+      val lastStageInfo = listener.stageIdToInfo.get(job.stageIds.max)
+      val lastStageData = lastStageInfo.flatMap { s =>
+        listener.stageIdToData.get((s.stageId, s.attemptId))
+      }
+      val isComplete = job.status == JobExecutionStatus.SUCCEEDED
+      val lastStageName = lastStageInfo.map(_.name).getOrElse("(Unknown Stage Name)")
+      val lastStageDescription = lastStageData.flatMap(_.description).getOrElse("")
+      val duration: Option[Long] = {
+        job.startTime.map { start =>
+          val end = job.endTime.getOrElse(System.currentTimeMillis())
+          end - start
+        }
+      }
+      val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown")
+      val formattedSubmissionTime = job.startTime.map(UIUtils.formatDate).getOrElse("Unknown")
+      val detailUrl =
+        "%s/jobs/job?id=%s".format(UIUtils.prependBaseUri(parent.basePath), job.jobId)
+      <tr>
+        <td sorttable_customkey={job.jobId.toString}>
+          {job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
+        </td>
+        <td>
+          <div><em>{lastStageDescription}</em></div>
+          <a href={detailUrl}>{lastStageName}</a>
+        </td>
+        <td sorttable_customkey={job.startTime.getOrElse(-1).toString}>
+          {formattedSubmissionTime}
+        </td>
+        <td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
+        <td class="stage-progress-cell">
+          {job.completedStageIndices.size}/{job.stageIds.size - job.numSkippedStages}
+          {if (job.numFailedStages > 0) s"(${job.numFailedStages} failed)"}
+          {if (job.numSkippedStages > 0) s"(${job.numSkippedStages} skipped)"}
+        </td>
+        <td class="progress-cell">
+          {UIUtils.makeProgressBar(started = job.numActiveTasks, completed = job.numCompletedTasks,
+           failed = job.numFailedTasks, skipped = job.numSkippedTasks,
+           total = job.numTasks - job.numSkippedTasks)}
+        </td>
+      </tr>
+    }
+
+    <table class="table table-bordered table-striped table-condensed sortable">
+      <thead>{columns}</thead>
+      <tbody>
+        {jobs.map(makeRow)}
+      </tbody>
+    </table>
+  }
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    listener.synchronized {
+      val activeJobs = listener.activeJobs.values.toSeq
+      val completedJobs = listener.completedJobs.reverse.toSeq
+      val failedJobs = listener.failedJobs.reverse.toSeq
+      val now = System.currentTimeMillis
+
+      val activeJobsTable =
+        jobsTable(activeJobs.sortBy(_.startTime.getOrElse(-1L)).reverse)
+      val completedJobsTable =
+        jobsTable(completedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)
+      val failedJobsTable =
+        jobsTable(failedJobs.sortBy(_.endTime.getOrElse(-1L)).reverse)
+
+      val summary: NodeSeq =
+        <div>
+          <ul class="unstyled">
+            {if (startTime.isDefined) {
+              // Total duration is not meaningful unless the UI is live
+              <li>
+                <strong>Total Duration: </strong>
+                {UIUtils.formatDuration(now - startTime.get)}
+              </li>
+            }}
+            <li>
+              <strong>Scheduling Mode: </strong>
+              {listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
+            </li>
+            <li>
+              <a href="#active"><strong>Active Jobs:</strong></a>
+              {activeJobs.size}
+            </li>
+            <li>
+              <a href="#completed"><strong>Completed Jobs:</strong></a>
+              {completedJobs.size}
+            </li>
+            <li>
+              <a href="#failed"><strong>Failed Jobs:</strong></a>
+              {failedJobs.size}
+            </li>
+          </ul>
+        </div>
+
+      val content = summary ++
+        <h4 id="active">Active Jobs ({activeJobs.size})</h4> ++ activeJobsTable ++
+        <h4 id="completed">Completed Jobs ({completedJobs.size})</h4> ++ completedJobsTable ++
+        <h4 id ="failed">Failed Jobs ({failedJobs.size})</h4> ++ failedJobsTable
+
+      val helpText = """A job is triggered by a action, like "count()" or "saveAsTextFile()".""" +
+        " Click on a job's title to see information about the stages of tasks associated with" +
+        " the job."
+
+      UIUtils.headerSparkPage("Spark Jobs", content, parent, helpText = Some(helpText))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
new file mode 100644
index 0000000..b0f8ca2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import javax.servlet.http.HttpServletRequest
+
+import scala.xml.{Node, NodeSeq}
+
+import org.apache.spark.scheduler.Schedulable
+import org.apache.spark.ui.{WebUIPage, UIUtils}
+
+/** Page showing list of all ongoing and recently finished stages and pools */
+private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
+  private val sc = parent.sc
+  private val listener = parent.listener
+  private def isFairScheduler = parent.isFairScheduler
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    listener.synchronized {
+      val activeStages = listener.activeStages.values.toSeq
+      val completedStages = listener.completedStages.reverse.toSeq
+      val numCompletedStages = listener.numCompletedStages
+      val failedStages = listener.failedStages.reverse.toSeq
+      val numFailedStages = listener.numFailedStages
+      val now = System.currentTimeMillis
+
+      val activeStagesTable =
+        new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
+          parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler,
+          killEnabled = parent.killEnabled)
+      val completedStagesTable =
+        new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath,
+          parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = false)
+      val failedStagesTable =
+        new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath,
+          parent.listener, isFairScheduler = parent.isFairScheduler)
+
+      // For now, pool information is only accessible in live UIs
+      val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable])
+      val poolTable = new PoolTable(pools, parent)
+
+      val summary: NodeSeq =
+        <div>
+          <ul class="unstyled">
+            {if (sc.isDefined) {
+              // Total duration is not meaningful unless the UI is live
+              <li>
+                <strong>Total Duration: </strong>
+                {UIUtils.formatDuration(now - sc.get.startTime)}
+              </li>
+            }}
+            <li>
+              <strong>Scheduling Mode: </strong>
+              {listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
+            </li>
+            <li>
+              <a href="#active"><strong>Active Stages:</strong></a>
+              {activeStages.size}
+            </li>
+            <li>
+              <a href="#completed"><strong>Completed Stages:</strong></a>
+              {numCompletedStages}
+            </li>
+             <li>
+             <a href="#failed"><strong>Failed Stages:</strong></a>
+              {numFailedStages}
+            </li>
+          </ul>
+        </div>
+
+      val content = summary ++
+        {if (sc.isDefined && isFairScheduler) {
+          <h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq
+        } else {
+          Seq[Node]()
+        }} ++
+        <h4 id="active">Active Stages ({activeStages.size})</h4> ++
+        activeStagesTable.toNodeSeq ++
+        <h4 id="completed">Completed Stages ({numCompletedStages})</h4> ++
+        completedStagesTable.toNodeSeq ++
+        <h4 id ="failed">Failed Stages ({numFailedStages})</h4> ++
+        failedStagesTable.toNodeSeq
+
+      UIUtils.headerSparkPage("Spark Stages (for all jobs)", content, parent)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index fa0f96b..35bbe8b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -25,7 +25,7 @@ import org.apache.spark.ui.jobs.UIData.StageUIData
 import org.apache.spark.util.Utils
 
 /** Stage summary grouped by executors. */
-private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: JobProgressTab) {
+private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: StagesTab) {
   private val listener = parent.listener
 
   def toNodeSeq: Seq[Node] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
new file mode 100644
index 0000000..77d3620
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import scala.collection.mutable
+import scala.xml.{NodeSeq, Node}
+
+import javax.servlet.http.HttpServletRequest
+
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.ui.{UIUtils, WebUIPage}
+
+/** Page showing statistics and stage list for a given job */
+private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
+  private val listener = parent.listener
+
+  def render(request: HttpServletRequest): Seq[Node] = {
+    listener.synchronized {
+      val jobId = request.getParameter("id").toInt
+      val jobDataOption = listener.jobIdToData.get(jobId)
+      if (jobDataOption.isEmpty) {
+        val content =
+          <div>
+            <p>No information to display for job {jobId}</p>
+          </div>
+        return UIUtils.headerSparkPage(
+          s"Details for Job $jobId", content, parent)
+      }
+      val jobData = jobDataOption.get
+      val isComplete = jobData.status != JobExecutionStatus.RUNNING
+      val stages = jobData.stageIds.map { stageId =>
+        // This could be empty if the JobProgressListener hasn't received information about the
+        // stage or if the stage information has been garbage collected
+        listener.stageIdToInfo.getOrElse(stageId,
+          new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, "Unknown"))
+      }
+
+      val activeStages = mutable.Buffer[StageInfo]()
+      val completedStages = mutable.Buffer[StageInfo]()
+      // If the job is completed, then any pending stages are displayed as "skipped":
+      val pendingOrSkippedStages = mutable.Buffer[StageInfo]()
+      val failedStages = mutable.Buffer[StageInfo]()
+      for (stage <- stages) {
+        if (stage.submissionTime.isEmpty) {
+          pendingOrSkippedStages += stage
+        } else if (stage.completionTime.isDefined) {
+          if (stage.failureReason.isDefined) {
+            failedStages += stage
+          } else {
+            completedStages += stage
+          }
+        } else {
+          activeStages += stage
+        }
+      }
+
+      val activeStagesTable =
+        new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
+          parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler,
+          killEnabled = parent.killEnabled)
+      val pendingOrSkippedStagesTable =
+        new StageTableBase(pendingOrSkippedStages.sortBy(_.stageId).reverse,
+          parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler,
+          killEnabled = false)
+      val completedStagesTable =
+        new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent.basePath,
+          parent.listener, isFairScheduler = parent.isFairScheduler, killEnabled = false)
+      val failedStagesTable =
+        new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent.basePath,
+          parent.listener, isFairScheduler = parent.isFairScheduler)
+
+      val shouldShowActiveStages = activeStages.nonEmpty
+      val shouldShowPendingStages = !isComplete && pendingOrSkippedStages.nonEmpty
+      val shouldShowCompletedStages = completedStages.nonEmpty
+      val shouldShowSkippedStages = isComplete && pendingOrSkippedStages.nonEmpty
+      val shouldShowFailedStages = failedStages.nonEmpty
+
+      val summary: NodeSeq =
+        <div>
+          <ul class="unstyled">
+            <li>
+              <Strong>Status:</Strong>
+              {jobData.status}
+            </li>
+            {
+              if (jobData.jobGroup.isDefined) {
+                <li>
+                  <strong>Job Group:</strong>
+                  {jobData.jobGroup.get}
+                </li>
+              }
+            }
+            {
+              if (shouldShowActiveStages) {
+                <li>
+                  <a href="#active"><strong>Active Stages:</strong></a>
+                  {activeStages.size}
+                </li>
+              }
+            }
+            {
+              if (shouldShowPendingStages) {
+                <li>
+                  <a href="#pending">
+                    <strong>Pending Stages:</strong>
+                  </a>{pendingOrSkippedStages.size}
+                </li>
+              }
+            }
+            {
+              if (shouldShowCompletedStages) {
+                <li>
+                  <a href="#completed"><strong>Completed Stages:</strong></a>
+                  {completedStages.size}
+                </li>
+              }
+            }
+            {
+              if (shouldShowSkippedStages) {
+              <li>
+                <a href="#skipped"><strong>Skipped Stages:</strong></a>
+                {pendingOrSkippedStages.size}
+              </li>
+            }
+            }
+            {
+              if (shouldShowFailedStages) {
+                <li>
+                  <a href="#failed"><strong>Failed Stages:</strong></a>
+                  {failedStages.size}
+                </li>
+              }
+            }
+          </ul>
+        </div>
+
+      var content = summary
+      if (shouldShowActiveStages) {
+        content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++
+          activeStagesTable.toNodeSeq
+      }
+      if (shouldShowPendingStages) {
+        content ++= <h4 id="pending">Pending Stages ({pendingOrSkippedStages.size})</h4> ++
+          pendingOrSkippedStagesTable.toNodeSeq
+      }
+      if (shouldShowCompletedStages) {
+        content ++= <h4 id="completed">Completed Stages ({completedStages.size})</h4> ++
+          completedStagesTable.toNodeSeq
+      }
+      if (shouldShowSkippedStages) {
+        content ++= <h4 id="skipped">Skipped Stages ({pendingOrSkippedStages.size})</h4> ++
+          pendingOrSkippedStagesTable.toNodeSeq
+      }
+      if (shouldShowFailedStages) {
+        content ++= <h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
+          failedStagesTable.toNodeSeq
+      }
+      UIUtils.headerSparkPage(s"Details for Job $jobId", content, parent)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index ccdcf0e..72935be 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.ui.jobs
 
-import scala.collection.mutable.{HashMap, ListBuffer}
+import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
 
 import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
@@ -49,8 +49,6 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
   type PoolName = String
   type ExecutorId = String
 
-  // Define all of our state:
-
   // Jobs:
   val activeJobs = new HashMap[JobId, JobUIData]
   val completedJobs = ListBuffer[JobUIData]()
@@ -60,9 +58,11 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
   // Stages:
   val activeStages = new HashMap[StageId, StageInfo]
   val completedStages = ListBuffer[StageInfo]()
+  val skippedStages = ListBuffer[StageInfo]()
   val failedStages = ListBuffer[StageInfo]()
   val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]
   val stageIdToInfo = new HashMap[StageId, StageInfo]
+  val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]]
   val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]()
   // Total of completed and failed stages that have ever been run.  These may be greater than
   // `completedStages.size` and `failedStages.size` if we have run more stages or jobs than
@@ -95,7 +95,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
     Map(
       "activeStages" -> activeStages.size,
       "activeJobs" -> activeJobs.size,
-      "poolToActiveStages" -> poolToActiveStages.values.map(_.size).sum
+      "poolToActiveStages" -> poolToActiveStages.values.map(_.size).sum,
+      "stageIdToActiveJobIds" -> stageIdToActiveJobIds.values.map(_.size).sum
     )
   }
 
@@ -106,6 +107,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
       "completedJobs" -> completedJobs.size,
       "failedJobs" -> failedJobs.size,
       "completedStages" -> completedStages.size,
+      "skippedStages" -> skippedStages.size,
       "failedStages" -> failedStages.size
     )
   }
@@ -144,11 +146,39 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
   }
 
   override def onJobStart(jobStart: SparkListenerJobStart) = synchronized {
-    val jobGroup = Option(jobStart.properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
+    val jobGroup = for (
+      props <- Option(jobStart.properties);
+      group <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
+    ) yield group
     val jobData: JobUIData =
-      new JobUIData(jobStart.jobId, jobStart.stageIds, jobGroup, JobExecutionStatus.RUNNING)
+      new JobUIData(
+        jobId = jobStart.jobId,
+        startTime = Some(System.currentTimeMillis),
+        endTime = None,
+        stageIds = jobStart.stageIds,
+        jobGroup = jobGroup,
+        status = JobExecutionStatus.RUNNING)
+    // Compute (a potential underestimate of) the number of tasks that will be run by this job.
+    // This may be an underestimate because the job start event references all of the result
+    // stages's transitive stage dependencies, but some of these stages might be skipped if their
+    // output is available from earlier runs.
+    // See https://github.com/apache/spark/pull/3009 for a more extensive discussion.
+    jobData.numTasks = {
+      val allStages = jobStart.stageInfos
+      val missingStages = allStages.filter(_.completionTime.isEmpty)
+      missingStages.map(_.numTasks).sum
+    }
     jobIdToData(jobStart.jobId) = jobData
     activeJobs(jobStart.jobId) = jobData
+    for (stageId <- jobStart.stageIds) {
+      stageIdToActiveJobIds.getOrElseUpdate(stageId, new HashSet[StageId]).add(jobStart.jobId)
+    }
+    // If there's no information for a stage, store the StageInfo received from the scheduler
+    // so that we can display stage descriptions for pending stages:
+    for (stageInfo <- jobStart.stageInfos) {
+      stageIdToInfo.getOrElseUpdate(stageInfo.stageId, stageInfo)
+      stageIdToData.getOrElseUpdate((stageInfo.stageId, stageInfo.attemptId), new StageUIData)
+    }
   }
 
   override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized {
@@ -156,6 +186,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
       logWarning(s"Job completed for unknown job ${jobEnd.jobId}")
       new JobUIData(jobId = jobEnd.jobId)
     }
+    jobData.endTime = Some(System.currentTimeMillis())
     jobEnd.jobResult match {
       case JobSucceeded =>
         completedJobs += jobData
@@ -166,6 +197,20 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
         trimJobsIfNecessary(failedJobs)
         jobData.status = JobExecutionStatus.FAILED
     }
+    for (stageId <- jobData.stageIds) {
+      stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage =>
+        jobsUsingStage.remove(jobEnd.jobId)
+        stageIdToInfo.get(stageId).foreach { stageInfo =>
+          if (stageInfo.submissionTime.isEmpty) {
+            // if this stage is pending, it won't complete, so mark it as "skipped":
+            skippedStages += stageInfo
+            trimStagesIfNecessary(skippedStages)
+            jobData.numSkippedStages += 1
+            jobData.numSkippedTasks += stageInfo.numTasks
+          }
+        }
+      }
+    }
   }
 
   override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
@@ -193,6 +238,19 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
       numFailedStages += 1
       trimStagesIfNecessary(failedStages)
     }
+
+    for (
+      activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId);
+      jobId <- activeJobsDependentOnStage;
+      jobData <- jobIdToData.get(jobId)
+    ) {
+      jobData.numActiveStages -= 1
+      if (stage.failureReason.isEmpty) {
+        jobData.completedStageIndices.add(stage.stageId)
+      } else {
+        jobData.numFailedStages += 1
+      }
+    }
   }
 
   /** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */
@@ -214,6 +272,14 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
 
     val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo])
     stages(stage.stageId) = stage
+
+    for (
+      activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId);
+      jobId <- activeJobsDependentOnStage;
+      jobData <- jobIdToData.get(jobId)
+    ) {
+      jobData.numActiveStages += 1
+    }
   }
 
   override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
@@ -226,6 +292,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
       stageData.numActiveTasks += 1
       stageData.taskData.put(taskInfo.taskId, new TaskUIData(taskInfo))
     }
+    for (
+      activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskStart.stageId);
+      jobId <- activeJobsDependentOnStage;
+      jobData <- jobIdToData.get(jobId)
+    ) {
+      jobData.numActiveTasks += 1
+    }
   }
 
   override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) {
@@ -283,6 +356,20 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
       taskData.taskInfo = info
       taskData.taskMetrics = metrics
       taskData.errorMessage = errorMessage
+
+      for (
+        activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskEnd.stageId);
+        jobId <- activeJobsDependentOnStage;
+        jobData <- jobIdToData.get(jobId)
+      ) {
+        jobData.numActiveTasks -= 1
+        taskEnd.reason match {
+          case Success =>
+            jobData.numCompletedTasks += 1
+          case _ =>
+            jobData.numFailedTasks += 1
+        }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
deleted file mode 100644
index 83a7898..0000000
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.jobs
-
-import javax.servlet.http.HttpServletRequest
-
-import scala.xml.{Node, NodeSeq}
-
-import org.apache.spark.scheduler.Schedulable
-import org.apache.spark.ui.{WebUIPage, UIUtils}
-
-/** Page showing list of all ongoing and recently finished stages and pools */
-private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") {
-  private val sc = parent.sc
-  private val listener = parent.listener
-  private def isFairScheduler = parent.isFairScheduler
-
-  def render(request: HttpServletRequest): Seq[Node] = {
-    listener.synchronized {
-      val activeStages = listener.activeStages.values.toSeq
-      val completedStages = listener.completedStages.reverse.toSeq
-      val numCompletedStages = listener.numCompletedStages
-      val failedStages = listener.failedStages.reverse.toSeq
-      val numFailedStages = listener.numFailedStages
-      val now = System.currentTimeMillis
-
-      val activeStagesTable =
-        new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
-          parent, parent.killEnabled)
-      val completedStagesTable =
-        new StageTableBase(completedStages.sortBy(_.submissionTime).reverse, parent)
-      val failedStagesTable =
-        new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
-
-      // For now, pool information is only accessible in live UIs
-      val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable])
-      val poolTable = new PoolTable(pools, parent)
-
-      val summary: NodeSeq =
-        <div>
-          <ul class="unstyled">
-            {if (sc.isDefined) {
-              // Total duration is not meaningful unless the UI is live
-              <li>
-                <strong>Total Duration: </strong>
-                {UIUtils.formatDuration(now - sc.get.startTime)}
-              </li>
-            }}
-            <li>
-              <strong>Scheduling Mode: </strong>
-              {listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
-            </li>
-            <li>
-              <a href="#active"><strong>Active Stages:</strong></a>
-              {activeStages.size}
-            </li>
-            <li>
-              <a href="#completed"><strong>Completed Stages:</strong></a>
-              {numCompletedStages}
-            </li>
-             <li>
-             <a href="#failed"><strong>Failed Stages:</strong></a>
-              {numFailedStages}
-            </li>
-          </ul>
-        </div>
-
-      val content = summary ++
-        {if (sc.isDefined && isFairScheduler) {
-          <h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq
-        } else {
-          Seq[Node]()
-        }} ++
-        <h4 id="active">Active Stages ({activeStages.size})</h4> ++
-        activeStagesTable.toNodeSeq ++
-        <h4 id="completed">Completed Stages ({numCompletedStages})</h4> ++
-        completedStagesTable.toNodeSeq ++
-        <h4 id ="failed">Failed Stages ({numFailedStages})</h4> ++
-        failedStagesTable.toNodeSeq
-
-      UIUtils.headerSparkPage("Spark Stages", content, parent)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
deleted file mode 100644
index 03ca918..0000000
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.ui.jobs
-
-import javax.servlet.http.HttpServletRequest
-
-import org.apache.spark.SparkConf
-import org.apache.spark.scheduler.SchedulingMode
-import org.apache.spark.ui.{SparkUI, SparkUITab}
-
-/** Web UI showing progress status of all jobs in the given SparkContext. */
-private[ui] class JobProgressTab(parent: SparkUI) extends SparkUITab(parent, "stages") {
-  val sc = parent.sc
-  val conf = sc.map(_.conf).getOrElse(new SparkConf)
-  val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)
-  val listener = parent.jobProgressListener
-
-  attachPage(new JobProgressPage(this))
-  attachPage(new StagePage(this))
-  attachPage(new PoolPage(this))
-
-  def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
-
-  def handleKillRequest(request: HttpServletRequest) =  {
-    if ((killEnabled) && (parent.securityManager.checkModifyPermissions(request.getRemoteUser))) {
-      val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
-      val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
-      if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
-        sc.get.cancelStage(stageId)
-      }
-      // Do a quick pause here to give Spark time to kill the stage so it shows up as
-      // killed after the refresh. Note that this will block the serving thread so the
-      // time should be limited in duration.
-      Thread.sleep(100)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
new file mode 100644
index 0000000..b2bbfde
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import org.apache.spark.scheduler.SchedulingMode
+import org.apache.spark.ui.{SparkUI, SparkUITab}
+
+/** Web UI showing progress status of all jobs in the given SparkContext. */
+private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
+  val sc = parent.sc
+  val killEnabled = parent.killEnabled
+  def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
+  val listener = parent.jobProgressListener
+
+  attachPage(new AllJobsPage(this))
+  attachPage(new JobPage(this))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index 770d99e..5fc6cc7 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -25,7 +25,7 @@ import org.apache.spark.scheduler.{Schedulable, StageInfo}
 import org.apache.spark.ui.{WebUIPage, UIUtils}
 
 /** Page showing specific pool details */
-private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") {
+private[ui] class PoolPage(parent: StagesTab) extends WebUIPage("pool") {
   private val sc = parent.sc
   private val listener = parent.listener
 
@@ -37,8 +37,9 @@ private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") {
         case Some(s) => s.values.toSeq
         case None => Seq[StageInfo]()
       }
-      val activeStagesTable =
-        new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, parent)
+      val activeStagesTable = new StageTableBase(activeStages.sortBy(_.submissionTime).reverse,
+        parent.basePath, parent.listener, isFairScheduler = parent.isFairScheduler,
+        killEnabled = parent.killEnabled)
 
       // For now, pool information is only accessible in live UIs
       val pools = sc.map(_.getPoolForName(poolName).get).toSeq

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
index 64178e1..df1899e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala
@@ -24,7 +24,7 @@ import org.apache.spark.scheduler.{Schedulable, StageInfo}
 import org.apache.spark.ui.UIUtils
 
 /** Table showing list of pools */
-private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) {
+private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) {
   private val listener = parent.listener
 
   def toNodeSeq: Seq[Node] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 36afc49..40e05f8 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.{Utils, Distribution}
 import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
 
 /** Page showing statistics and task list for a given stage */
-private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
+private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
   private val listener = parent.listener
 
   def render(request: HttpServletRequest): Seq[Node] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index 2ff561c..e7d6244 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -31,11 +31,10 @@ import org.apache.spark.util.Utils
 /** Page showing list of all ongoing and recently finished stages */
 private[ui] class StageTableBase(
     stages: Seq[StageInfo],
-    parent: JobProgressTab,
-    killEnabled: Boolean = false) {
-
-  private val listener = parent.listener
-  protected def isFairScheduler = parent.isFairScheduler
+    basePath: String,
+    listener: JobProgressListener,
+    isFairScheduler: Boolean,
+    killEnabled: Boolean) {
 
   protected def columns: Seq[Node] = {
     <th>Stage Id</th> ++
@@ -73,25 +72,11 @@ private[ui] class StageTableBase(
     </table>
   }
 
-  private def makeProgressBar(started: Int, completed: Int, failed: Int, total: Int): Seq[Node] =
-  {
-    val completeWidth = "width: %s%%".format((completed.toDouble/total)*100)
-    val startWidth = "width: %s%%".format((started.toDouble/total)*100)
-
-    <div class="progress">
-      <span style="text-align:center; position:absolute; width:100%; left:0;">
-        {completed}/{total} { if (failed > 0) s"($failed failed)" else "" }
-      </span>
-      <div class="bar bar-completed" style={completeWidth}></div>
-      <div class="bar bar-running" style={startWidth}></div>
-    </div>
-  }
-
   private def makeDescription(s: StageInfo): Seq[Node] = {
     // scalastyle:off
     val killLink = if (killEnabled) {
       val killLinkUri = "%s/stages/stage/kill?id=%s&terminate=true"
-        .format(UIUtils.prependBaseUri(parent.basePath), s.stageId)
+        .format(UIUtils.prependBaseUri(basePath), s.stageId)
       val confirm = "return window.confirm('Are you sure you want to kill stage %s ?');"
         .format(s.stageId)
       <span class="kill-link">
@@ -101,7 +86,7 @@ private[ui] class StageTableBase(
     // scalastyle:on
 
     val nameLinkUri ="%s/stages/stage?id=%s&attempt=%s"
-      .format(UIUtils.prependBaseUri(parent.basePath), s.stageId, s.attemptId)
+      .format(UIUtils.prependBaseUri(basePath), s.stageId, s.attemptId)
     val nameLink = <a href={nameLinkUri}>{s.name}</a>
 
     val cachedRddInfos = s.rddInfos.filter(_.numCachedPartitions > 0)
@@ -115,7 +100,7 @@ private[ui] class StageTableBase(
           Text("RDD: ") ++
           // scalastyle:off
           cachedRddInfos.map { i =>
-            <a href={"%s/storage/rdd?id=%d".format(UIUtils.prependBaseUri(parent.basePath), i.id)}>{i.name}</a>
+            <a href={"%s/storage/rdd?id=%d".format(UIUtils.prependBaseUri(basePath), i.id)}>{i.name}</a>
           }
           // scalastyle:on
         }}
@@ -167,7 +152,7 @@ private[ui] class StageTableBase(
     {if (isFairScheduler) {
       <td>
         <a href={"%s/stages/pool?poolname=%s"
-          .format(UIUtils.prependBaseUri(parent.basePath), stageData.schedulingPool)}>
+          .format(UIUtils.prependBaseUri(basePath), stageData.schedulingPool)}>
           {stageData.schedulingPool}
         </a>
       </td>
@@ -180,8 +165,9 @@ private[ui] class StageTableBase(
     </td>
     <td sorttable_customkey={duration.getOrElse(-1).toString}>{formattedDuration}</td>
     <td class="progress-cell">
-      {makeProgressBar(stageData.numActiveTasks, stageData.completedIndices.size,
-        stageData.numFailedTasks, s.numTasks)}
+      {UIUtils.makeProgressBar(started = stageData.numActiveTasks,
+        completed = stageData.completedIndices.size, failed = stageData.numFailedTasks,
+        skipped = 0, total = s.numTasks)}
     </td>
     <td sorttable_customkey={inputRead.toString}>{inputReadWithUnit}</td>
     <td sorttable_customkey={outputWrite.toString}>{outputWriteWithUnit}</td>
@@ -195,9 +181,10 @@ private[ui] class StageTableBase(
 
 private[ui] class FailedStageTable(
     stages: Seq[StageInfo],
-    parent: JobProgressTab,
-    killEnabled: Boolean = false)
-  extends StageTableBase(stages, parent, killEnabled) {
+    basePath: String,
+    listener: JobProgressListener,
+    isFairScheduler: Boolean)
+  extends StageTableBase(stages, basePath, listener, isFairScheduler, killEnabled = false) {
 
   override protected def columns: Seq[Node] = super.columns ++ <th>Failure Reason</th>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
new file mode 100644
index 0000000..937261d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui.jobs
+
+import javax.servlet.http.HttpServletRequest
+
+import org.apache.spark.scheduler.SchedulingMode
+import org.apache.spark.ui.{SparkUI, SparkUITab}
+
+/** Web UI showing progress status of all stages in the given SparkContext. */
+private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages") {
+  val sc = parent.sc
+  val killEnabled = parent.killEnabled
+  val listener = parent.jobProgressListener
+
+  attachPage(new AllStagesPage(this))
+  attachPage(new StagePage(this))
+  attachPage(new PoolPage(this))
+
+  def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
+
+  def handleKillRequest(request: HttpServletRequest) =  {
+    if ((killEnabled) && (parent.securityManager.checkModifyPermissions(request.getRemoteUser))) {
+      val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
+      val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
+      if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
+        sc.get.cancelStage(stageId)
+      }
+      // Do a quick pause here to give Spark time to kill the stage so it shows up as
+      // killed after the refresh. Note that this will block the serving thread so the
+      // time should be limited in duration.
+      Thread.sleep(100)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index 2f7d618..48fd7ca 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -40,9 +40,28 @@ private[jobs] object UIData {
 
   class JobUIData(
     var jobId: Int = -1,
+    var startTime: Option[Long] = None,
+    var endTime: Option[Long] = None,
     var stageIds: Seq[Int] = Seq.empty,
     var jobGroup: Option[String] = None,
-    var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN
+    var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN,
+    /* Tasks */
+    // `numTasks` is a potential underestimate of the true number of tasks that this job will run.
+    // This may be an underestimate because the job start event references all of the result
+    // stages's transitive stage dependencies, but some of these stages might be skipped if their
+    // output is available from earlier runs.
+    // See https://github.com/apache/spark/pull/3009 for a more extensive discussion.
+    var numTasks: Int = 0,
+    var numActiveTasks: Int = 0,
+    var numCompletedTasks: Int = 0,
+    var numSkippedTasks: Int = 0,
+    var numFailedTasks: Int = 0,
+    /* Stages */
+    var numActiveStages: Int = 0,
+    // This needs to be a set instead of a simple count to prevent double-counting of rerun stages:
+    var completedStageIndices: OpenHashSet[Int] = new OpenHashSet[Int](),
+    var numSkippedStages: Int = 0,
+    var numFailedStages: Int = 0
   )
 
   class StageUIData {

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 7e536ed..7b5db1e 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -31,6 +31,21 @@ import org.apache.spark.scheduler._
 import org.apache.spark.storage._
 import org.apache.spark._
 
+/**
+ * Serializes SparkListener events to/from JSON.  This protocol provides strong backwards-
+ * and forwards-compatibility guarantees: any version of Spark should be able to read JSON output
+ * written by any other version, including newer versions.
+ *
+ * JsonProtocolSuite contains backwards-compatibility tests which check that the current version of
+ * JsonProtocol is able to read output written by earlier versions.  We do not currently have tests
+ * for reading newer JSON output with older Spark versions.
+ *
+ * To ensure that we provide these guarantees, follow these rules when modifying these methods:
+ *
+ *  - Never delete any JSON fields.
+ *  - Any new JSON fields should be optional; use `Utils.jsonOption` when reading these fields
+ *    in `*FromJson` methods.
+ */
 private[spark] object JsonProtocol {
   // TODO: Remove this file and put JSON serialization into each individual class.
 
@@ -121,6 +136,7 @@ private[spark] object JsonProtocol {
     val properties = propertiesToJson(jobStart.properties)
     ("Event" -> Utils.getFormattedClassName(jobStart)) ~
     ("Job ID" -> jobStart.jobId) ~
+    ("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~  // Added in Spark 1.2.0
     ("Stage IDs" -> jobStart.stageIds) ~
     ("Properties" -> properties)
   }
@@ -455,7 +471,12 @@ private[spark] object JsonProtocol {
     val jobId = (json \ "Job ID").extract[Int]
     val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int])
     val properties = propertiesFromJson(json \ "Properties")
-    SparkListenerJobStart(jobId, stageIds, properties)
+    // The "Stage Infos" field was added in Spark 1.2.0
+    val stageInfos = Utils.jsonOption(json \ "Stage Infos")
+      .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
+        stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
+      }
+    SparkListenerJobStart(jobId, stageInfos, properties)
   }
 
   def jobEndFromJson(json: JValue): SparkListenerJobEnd = {

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index bacf6a1..d2857b8 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -17,16 +17,20 @@
 
 package org.apache.spark.ui
 
-import org.apache.spark.api.java.StorageLevels
-import org.apache.spark.{SparkException, SparkConf, SparkContext}
-import org.openqa.selenium.WebDriver
+import scala.collection.JavaConversions._
+
+import org.openqa.selenium.{By, WebDriver}
 import org.openqa.selenium.htmlunit.HtmlUnitDriver
 import org.scalatest._
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.selenium.WebBrowser
 import org.scalatest.time.SpanSugar._
 
+import org.apache.spark._
+import org.apache.spark.SparkContext._
 import org.apache.spark.LocalSparkContext._
+import org.apache.spark.api.java.StorageLevels
+import org.apache.spark.shuffle.FetchFailedException
 
 /**
  * Selenium tests for the Spark Web UI.  These tests are not run by default
@@ -89,7 +93,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers {
         sc.parallelize(1 to 10).map { x => throw new Exception()}.collect()
       }
       eventually(timeout(5 seconds), interval(50 milliseconds)) {
-        go to sc.ui.get.appUIAddress
+        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages")
         find(id("active")).get.text should be("Active Stages (0)")
         find(id("failed")).get.text should be("Failed Stages (1)")
       }
@@ -101,7 +105,7 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers {
         sc.parallelize(1 to 10).map { x => unserializableObject}.collect()
       }
       eventually(timeout(5 seconds), interval(50 milliseconds)) {
-        go to sc.ui.get.appUIAddress
+        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages")
         find(id("active")).get.text should be("Active Stages (0)")
         // The failure occurs before the stage becomes active, hence we should still show only one
         // failed stage, not two:
@@ -109,4 +113,191 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers {
       }
     }
   }
+
+  test("spark.ui.killEnabled should properly control kill button display") {
+    def getSparkContext(killEnabled: Boolean): SparkContext = {
+      val conf = new SparkConf()
+        .setMaster("local")
+        .setAppName("test")
+        .set("spark.ui.enabled", "true")
+        .set("spark.ui.killEnabled", killEnabled.toString)
+      new SparkContext(conf)
+    }
+
+    def hasKillLink = find(className("kill-link")).isDefined
+    def runSlowJob(sc: SparkContext) {
+      sc.parallelize(1 to 10).map{x => Thread.sleep(10000); x}.countAsync()
+    }
+
+    withSpark(getSparkContext(killEnabled = true)) { sc =>
+      runSlowJob(sc)
+      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages")
+        assert(hasKillLink)
+      }
+    }
+
+    withSpark(getSparkContext(killEnabled = false)) { sc =>
+      runSlowJob(sc)
+      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/stages")
+        assert(!hasKillLink)
+      }
+    }
+  }
+
+  test("jobs page should not display job group name unless some job was submitted in a job group") {
+    withSpark(newSparkContext()) { sc =>
+      // If no job has been run in a job group, then "(Job Group)" should not appear in the header
+      sc.parallelize(Seq(1, 2, 3)).count()
+      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs")
+        val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
+        tableHeaders should not contain "Job Id (Job Group)"
+      }
+      // Once at least one job has been run in a job group, then we should display the group name:
+      sc.setJobGroup("my-job-group", "my-job-group-description")
+      sc.parallelize(Seq(1, 2, 3)).count()
+      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs")
+        val tableHeaders = findAll(cssSelector("th")).map(_.text).toSeq
+        tableHeaders should contain ("Job Id (Job Group)")
+      }
+    }
+  }
+
+  test("job progress bars should handle stage / task failures") {
+    withSpark(newSparkContext()) { sc =>
+      val data = sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity)
+      val shuffleHandle =
+        data.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
+      // Simulate fetch failures:
+      val mappedData = data.map { x =>
+        val taskContext = TaskContext.get
+        if (taskContext.attemptId() == 1) {  // Cause this stage to fail on its first attempt.
+          val env = SparkEnv.get
+          val bmAddress = env.blockManager.blockManagerId
+          val shuffleId = shuffleHandle.shuffleId
+          val mapId = 0
+          val reduceId = taskContext.partitionId()
+          val message = "Simulated fetch failure"
+          throw new FetchFailedException(bmAddress, shuffleId, mapId, reduceId, message)
+        } else {
+          x
+        }
+      }
+      mappedData.count()
+      eventually(timeout(5 seconds), interval(50 milliseconds)) {
+        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs")
+        find(cssSelector(".stage-progress-cell")).get.text should be ("2/2 (1 failed)")
+        // Ideally, the following test would pass, but currently we overcount completed tasks
+        // if task recomputations occur:
+        // find(cssSelector(".progress-cell .progress")).get.text should be ("2/2 (1 failed)")
+        // Instead, we guarantee that the total number of tasks is always correct, while the number
+        // of completed tasks may be higher:
+        find(cssSelector(".progress-cell .progress")).get.text should be ("3/2 (1 failed)")
+      }
+    }
+  }
+
+  test("job details page should display useful information for stages that haven't started") {
+    withSpark(newSparkContext()) { sc =>
+      // Create a multi-stage job with a long delay in the first stage:
+      val rdd = sc.parallelize(Seq(1, 2, 3)).map { x =>
+        // This long sleep call won't slow down the tests because we don't actually need to wait
+        // for the job to finish.
+        Thread.sleep(20000)
+      }.groupBy(identity).map(identity).groupBy(identity).map(identity)
+      // Start the job:
+      rdd.countAsync()
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/?id=0")
+        find(id("active")).get.text should be ("Active Stages (1)")
+        find(id("pending")).get.text should be ("Pending Stages (2)")
+        // Essentially, we want to check that none of the stage rows show
+        // "No data available for this stage". Checking for the absence of that string is brittle
+        // because someone could change the error message and cause this test to pass by accident.
+        // Instead, it's safer to check that each row contains a link to a stage details page.
+        findAll(cssSelector("tbody tr")).foreach { row =>
+          val link = row.underlying.findElement(By.xpath(".//a"))
+          link.getAttribute("href") should include ("stage")
+        }
+      }
+    }
+  }
+
+  test("job progress bars / cells reflect skipped stages / tasks") {
+    withSpark(newSparkContext()) { sc =>
+      // Create an RDD that involves multiple stages:
+      val rdd = sc.parallelize(1 to 8, 8)
+        .map(x => x).groupBy((x: Int) => x, numPartitions = 8)
+        .flatMap(x => x._2).groupBy((x: Int) => x, numPartitions = 8)
+      // Run it twice; this will cause the second job to have two "phantom" stages that were
+      // mentioned in its job start event but which were never actually executed:
+      rdd.count()
+      rdd.count()
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs")
+        // The completed jobs table should have two rows. The first row will be the most recent job:
+        val firstRow = find(cssSelector("tbody tr")).get.underlying
+        val firstRowColumns = firstRow.findElements(By.tagName("td"))
+        firstRowColumns(0).getText should be ("1")
+        firstRowColumns(4).getText should be ("1/1 (2 skipped)")
+        firstRowColumns(5).getText should be ("8/8 (16 skipped)")
+        // The second row is the first run of the job, where nothing was skipped:
+        val secondRow = findAll(cssSelector("tbody tr")).toSeq(1).underlying
+        val secondRowColumns = secondRow.findElements(By.tagName("td"))
+        secondRowColumns(0).getText should be ("0")
+        secondRowColumns(4).getText should be ("3/3")
+        secondRowColumns(5).getText should be ("24/24")
+      }
+    }
+  }
+
+  test("stages that aren't run appear as 'skipped stages' after a job finishes") {
+    withSpark(newSparkContext()) { sc =>
+      // Create an RDD that involves multiple stages:
+      val rdd =
+        sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity).map(identity).groupBy(identity)
+      // Run it twice; this will cause the second job to have two "phantom" stages that were
+      // mentioned in its job start event but which were never actually executed:
+      rdd.count()
+      rdd.count()
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs/job/?id=1")
+        find(id("pending")) should be (None)
+        find(id("active")) should be (None)
+        find(id("failed")) should be (None)
+        find(id("completed")).get.text should be ("Completed Stages (1)")
+        find(id("skipped")).get.text should be ("Skipped Stages (2)")
+        // Essentially, we want to check that none of the stage rows show
+        // "No data available for this stage". Checking for the absence of that string is brittle
+        // because someone could change the error message and cause this test to pass by accident.
+        // Instead, it's safer to check that each row contains a link to a stage details page.
+        findAll(cssSelector("tbody tr")).foreach { row =>
+          val link = row.underlying.findElement(By.xpath(".//a"))
+          link.getAttribute("href") should include ("stage")
+        }
+      }
+    }
+  }
+
+  test("jobs with stages that are skipped should show correct link descriptions on all jobs page") {
+    withSpark(newSparkContext()) { sc =>
+      // Create an RDD that involves multiple stages:
+      val rdd =
+        sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity).map(identity).groupBy(identity)
+      // Run it twice; this will cause the second job to have two "phantom" stages that were
+      // mentioned in its job start event but which were never actually executed:
+      rdd.count()
+      rdd.count()
+      eventually(timeout(10 seconds), interval(50 milliseconds)) {
+        go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs")
+        findAll(cssSelector("tbody tr a")).foreach { link =>
+          link.text.toLowerCase should include ("count")
+          link.text.toLowerCase should not include "unknown"
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4a90276a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
index 15c5b4e..12af60c 100644
--- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala
@@ -43,7 +43,10 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
   }
 
   private def createJobStartEvent(jobId: Int, stageIds: Seq[Int]) = {
-    SparkListenerJobStart(jobId, stageIds)
+    val stageInfos = stageIds.map { stageId =>
+      new StageInfo(stageId, 0, stageId.toString, 0, null, "")
+    }
+    SparkListenerJobStart(jobId, stageInfos)
   }
 
   private def createJobEndEvent(jobId: Int, failed: Boolean = false) = {
@@ -52,8 +55,9 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
   }
 
   private def runJob(listener: SparkListener, jobId: Int, shouldFail: Boolean = false) {
+    val stagesThatWontBeRun = jobId * 200 to jobId * 200 + 10
     val stageIds = jobId * 100 to jobId * 100 + 50
-    listener.onJobStart(createJobStartEvent(jobId, stageIds))
+    listener.onJobStart(createJobStartEvent(jobId, stageIds ++ stagesThatWontBeRun))
     for (stageId <- stageIds) {
       listener.onStageSubmitted(createStageStartEvent(stageId))
       listener.onStageCompleted(createStageEndEvent(stageId, failed = stageId % 2 == 0))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org