You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/10/25 09:07:44 UTC

git commit: [SPARK-2321] Stable pull-based progress / status API

Repository: spark
Updated Branches:
  refs/heads/master 3a845d3c0 -> 953031688


[SPARK-2321] Stable pull-based progress / status API

This pull request is a first step towards the implementation of a stable, pull-based progress / status API for Spark (see [SPARK-2321](https://issues.apache.org/jira/browse/SPARK-2321)).  For now, I'd like to discuss the basic implementation, API names, and overall interface design.  Once we arrive at a good design, I'll go back and add additional methods to expose more information via these API.

#### Design goals:

- Pull-based API
- Usable from Java / Scala / Python (eventually, likely with a wrapper)
- Can be extended to expose more information without introducing binary incompatibilities.
- Returns immutable objects.
- Don't leak any implementation details, preserving our freedom to change the implementation.

#### Implementation:

- Add public methods (`getJobInfo`, `getStageInfo`) to SparkContext to allow status / progress information to be retrieved.
- Add public interfaces (`SparkJobInfo`, `SparkStageInfo`) for our API return values.  These interfaces consist entirely of Java-style getter methods.  The interfaces are currently implemented in Java.  I decided to explicitly separate the interface from its implementation (`SparkJobInfoImpl`, `SparkStageInfoImpl`) in order to prevent users from constructing these responses themselves.
-Allow an existing JobProgressListener to be used when constructing a live SparkUI.  This allows us to re-use this listeners in the implementation of this status API.  There are a few reasons why this listener re-use makes sense:
   - The status API and web UI are guaranteed to show consistent information.
   - These listeners are already well-tested.
   - The same garbage-collection / information retention configurations can apply to both this API and the web UI.
- Extend JobProgressListener to maintain `jobId -> Job` and `stageId -> Stage` mappings.

The progress API methods are implemented in a separate trait that's mixed into SparkContext.  This helps to avoid SparkContext.scala from becoming larger and more difficult to read.

Author: Josh Rosen <jo...@databricks.com>
Author: Josh Rosen <jo...@apache.org>

Closes #2696 from JoshRosen/progress-reporting-api and squashes the following commits:

e6aa78d [Josh Rosen] Add tests.
b585c16 [Josh Rosen] Accept SparkListenerBus instead of more specific subclasses.
c96402d [Josh Rosen] Address review comments.
2707f98 [Josh Rosen] Expose current stage attempt id
c28ba76 [Josh Rosen] Update demo code:
646ff1d [Josh Rosen] Document spark.ui.retainedJobs.
7f47d6d [Josh Rosen] Clean up SparkUI constructors, per Andrew's feedback.
b77b3d8 [Josh Rosen] Merge remote-tracking branch 'origin/master' into progress-reporting-api
787444c [Josh Rosen] Move status API methods into trait that can be mixed into SparkContext.
f9a9a00 [Josh Rosen] More review comments:
3dc79af [Josh Rosen] Remove creation of unused listeners in SparkContext.
249ca16 [Josh Rosen] Address several review comments:
da5648e [Josh Rosen] Add example of basic progress reporting in Java.
7319ffd [Josh Rosen] Add getJobIdsForGroup() and num*Tasks() methods.
cc568e5 [Josh Rosen] Add note explaining that interfaces should not be implemented outside of Spark.
6e840d4 [Josh Rosen] Remove getter-style names and "consistent snapshot" semantics:
08cbec9 [Josh Rosen] Begin to sketch the interfaces for a stable, public status API.
ac2d13a [Josh Rosen] Add jobId->stage, stageId->stage mappings in JobProgressListener
24de263 [Josh Rosen] Create UI listeners in SparkContext instead of in Tabs:


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95303168
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95303168
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95303168

Branch: refs/heads/master
Commit: 9530316887612dca060a128fca34dd5a6ab2a9a9
Parents: 3a845d3
Author: Josh Rosen <jo...@databricks.com>
Authored: Sat Oct 25 00:06:57 2014 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Sat Oct 25 00:06:57 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/JobExecutionStatus.java    |  25 ++++
 .../java/org/apache/spark/SparkJobInfo.java     |  30 ++++
 .../java/org/apache/spark/SparkStageInfo.java   |  34 +++++
 .../scala/org/apache/spark/SparkContext.scala   |  76 ++--------
 .../scala/org/apache/spark/SparkStatusAPI.scala | 142 +++++++++++++++++++
 .../scala/org/apache/spark/StatusAPIImpl.scala  |  34 +++++
 .../spark/api/java/JavaSparkContext.scala       |  19 +++
 .../deploy/history/FsHistoryProvider.scala      |   2 +-
 .../org/apache/spark/deploy/master/Master.scala |   4 +-
 .../scala/org/apache/spark/ui/SparkUI.scala     | 108 +++++++++-----
 .../apache/spark/ui/env/EnvironmentTab.scala    |   4 +-
 .../org/apache/spark/ui/exec/ExecutorsTab.scala |   3 +-
 .../spark/ui/jobs/JobProgressListener.scala     |  49 ++++++-
 .../apache/spark/ui/jobs/JobProgressPage.scala  |   9 +-
 .../apache/spark/ui/jobs/JobProgressTab.scala   |  10 +-
 .../org/apache/spark/ui/jobs/PoolPage.scala     |   3 +-
 .../scala/org/apache/spark/ui/jobs/UIData.scala |   8 ++
 .../apache/spark/ui/storage/StorageTab.scala    |   3 +-
 .../scala/org/apache/spark/StatusAPISuite.scala |  78 ++++++++++
 docs/configuration.md                           |  11 +-
 .../spark/examples/JavaStatusAPIDemo.java       |  70 +++++++++
 21 files changed, 588 insertions(+), 134 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/main/java/org/apache/spark/JobExecutionStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/JobExecutionStatus.java b/core/src/main/java/org/apache/spark/JobExecutionStatus.java
new file mode 100644
index 0000000..6e16131
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/JobExecutionStatus.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+public enum JobExecutionStatus {
+  RUNNING,
+  SUCCEEDED,
+  FAILED,
+  UNKNOWN
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/main/java/org/apache/spark/SparkJobInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/SparkJobInfo.java b/core/src/main/java/org/apache/spark/SparkJobInfo.java
new file mode 100644
index 0000000..4e3c983
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/SparkJobInfo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+/**
+ * Exposes information about Spark Jobs.
+ *
+ * This interface is not designed to be implemented outside of Spark.  We may add additional methods
+ * which may break binary compatibility with outside implementations.
+ */
+public interface SparkJobInfo {
+  int jobId();
+  int[] stageIds();
+  JobExecutionStatus status();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/main/java/org/apache/spark/SparkStageInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/SparkStageInfo.java b/core/src/main/java/org/apache/spark/SparkStageInfo.java
new file mode 100644
index 0000000..04e2247
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/SparkStageInfo.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+/**
+ * Exposes information about Spark Stages.
+ *
+ * This interface is not designed to be implemented outside of Spark.  We may add additional methods
+ * which may break binary compatibility with outside implementations.
+ */
+public interface SparkStageInfo {
+  int stageId();
+  int currentAttemptId();
+  String name();
+  int numTasks();
+  int numActiveTasks();
+  int numCompletedTasks();
+  int numFailedTasks();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4565832..e8fdfff 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicInteger
 import java.util.{Properties, UUID}
 import java.util.UUID.randomUUID
 import scala.collection.{Map, Set}
-import scala.collection.JavaConversions._
 import scala.collection.generic.Growable
 import scala.collection.mutable.HashMap
 import scala.reflect.{ClassTag, classTag}
@@ -51,6 +50,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
 import org.apache.spark.scheduler.local.LocalBackend
 import org.apache.spark.storage._
 import org.apache.spark.ui.SparkUI
+import org.apache.spark.ui.jobs.JobProgressListener
 import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
 
 /**
@@ -61,7 +61,7 @@ import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, Metadat
  *   this config overrides the default configs as well as system properties.
  */
 
-class SparkContext(config: SparkConf) extends Logging {
+class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
 
   // This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
   // etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
@@ -224,10 +224,15 @@ class SparkContext(config: SparkConf) extends Logging {
   private[spark] val metadataCleaner =
     new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
 
-  // Initialize the Spark UI, registering all associated listeners
+
+  private[spark] val jobProgressListener = new JobProgressListener(conf)
+  listenerBus.addListener(jobProgressListener)
+
+  // Initialize the Spark UI
   private[spark] val ui: Option[SparkUI] =
     if (conf.getBoolean("spark.ui.enabled", true)) {
-      Some(new SparkUI(this))
+      Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,
+        env.securityManager,appName))
     } else {
       // For tests, do not enable the UI
       None
@@ -855,69 +860,6 @@ class SparkContext(config: SparkConf) extends Logging {
   def version = SPARK_VERSION
 
   /**
-   * Return a map from the slave to the max memory available for caching and the remaining
-   * memory available for caching.
-   */
-  def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
-    env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
-      (blockManagerId.host + ":" + blockManagerId.port, mem)
-    }
-  }
-
-  /**
-   * :: DeveloperApi ::
-   * Return information about what RDDs are cached, if they are in mem or on disk, how much space
-   * they take, etc.
-   */
-  @DeveloperApi
-  def getRDDStorageInfo: Array[RDDInfo] = {
-    val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
-    StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
-    rddInfos.filter(_.isCached)
-  }
-
-  /**
-   * Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
-   * Note that this does not necessarily mean the caching or computation was successful.
-   */
-  def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
-
-  /**
-   * :: DeveloperApi ::
-   * Return information about blocks stored in all of the slaves
-   */
-  @DeveloperApi
-  def getExecutorStorageStatus: Array[StorageStatus] = {
-    env.blockManager.master.getStorageStatus
-  }
-
-  /**
-   * :: DeveloperApi ::
-   * Return pools for fair scheduler
-   */
-  @DeveloperApi
-  def getAllPools: Seq[Schedulable] = {
-    // TODO(xiajunluan): We should take nested pools into account
-    taskScheduler.rootPool.schedulableQueue.toSeq
-  }
-
-  /**
-   * :: DeveloperApi ::
-   * Return the pool associated with the given name, if one exists
-   */
-  @DeveloperApi
-  def getPoolForName(pool: String): Option[Schedulable] = {
-    Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
-  }
-
-  /**
-   * Return current scheduling mode
-   */
-  def getSchedulingMode: SchedulingMode.SchedulingMode = {
-    taskScheduler.schedulingMode
-  }
-
-  /**
    * Clear the job's list of files added by `addFile` so that they do not get downloaded to
    * any new nodes.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala
new file mode 100644
index 0000000..1982499
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/SparkStatusAPI.scala
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.collection.Map
+import scala.collection.JavaConversions._
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.scheduler.{SchedulingMode, Schedulable}
+import org.apache.spark.storage.{StorageStatus, StorageUtils, RDDInfo}
+
+/**
+ * Trait that implements Spark's status APIs.  This trait is designed to be mixed into
+ * SparkContext; it allows the status API code to live in its own file.
+ */
+private[spark] trait SparkStatusAPI { this: SparkContext =>
+
+  /**
+   * Return a map from the slave to the max memory available for caching and the remaining
+   * memory available for caching.
+   */
+  def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
+    env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
+      (blockManagerId.host + ":" + blockManagerId.port, mem)
+    }
+  }
+
+  /**
+   * :: DeveloperApi ::
+   * Return information about what RDDs are cached, if they are in mem or on disk, how much space
+   * they take, etc.
+   */
+  @DeveloperApi
+  def getRDDStorageInfo: Array[RDDInfo] = {
+    val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
+    StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
+    rddInfos.filter(_.isCached)
+  }
+
+  /**
+   * Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
+   * Note that this does not necessarily mean the caching or computation was successful.
+   */
+  def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
+
+  /**
+   * :: DeveloperApi ::
+   * Return information about blocks stored in all of the slaves
+   */
+  @DeveloperApi
+  def getExecutorStorageStatus: Array[StorageStatus] = {
+    env.blockManager.master.getStorageStatus
+  }
+
+  /**
+   * :: DeveloperApi ::
+   * Return pools for fair scheduler
+   */
+  @DeveloperApi
+  def getAllPools: Seq[Schedulable] = {
+    // TODO(xiajunluan): We should take nested pools into account
+    taskScheduler.rootPool.schedulableQueue.toSeq
+  }
+
+  /**
+   * :: DeveloperApi ::
+   * Return the pool associated with the given name, if one exists
+   */
+  @DeveloperApi
+  def getPoolForName(pool: String): Option[Schedulable] = {
+    Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
+  }
+
+  /**
+   * Return current scheduling mode
+   */
+  def getSchedulingMode: SchedulingMode.SchedulingMode = {
+    taskScheduler.schedulingMode
+  }
+
+
+  /**
+   * Return a list of all known jobs in a particular job group.  The returned list may contain
+   * running, failed, and completed jobs, and may vary across invocations of this method.  This
+   * method does not guarantee the order of the elements in its result.
+   */
+  def getJobIdsForGroup(jobGroup: String): Array[Int] = {
+    jobProgressListener.synchronized {
+      val jobData = jobProgressListener.jobIdToData.valuesIterator
+      jobData.filter(_.jobGroup.exists(_ == jobGroup)).map(_.jobId).toArray
+    }
+  }
+
+  /**
+   * Returns job information, or `None` if the job info could not be found or was garbage collected.
+   */
+  def getJobInfo(jobId: Int): Option[SparkJobInfo] = {
+    jobProgressListener.synchronized {
+      jobProgressListener.jobIdToData.get(jobId).map { data =>
+        new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status)
+      }
+    }
+  }
+
+  /**
+   * Returns stage information, or `None` if the stage info could not be found or was
+   * garbage collected.
+   */
+  def getStageInfo(stageId: Int): Option[SparkStageInfo] = {
+    jobProgressListener.synchronized {
+      for (
+        info <- jobProgressListener.stageIdToInfo.get(stageId);
+        data <- jobProgressListener.stageIdToData.get((stageId, info.attemptId))
+      ) yield {
+        new SparkStageInfoImpl(
+          stageId,
+          info.attemptId,
+          info.name,
+          info.numTasks,
+          data.numActiveTasks,
+          data.numCompleteTasks,
+          data.numFailedTasks)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
new file mode 100644
index 0000000..90b47c8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+private class SparkJobInfoImpl (
+  val jobId: Int,
+  val stageIds: Array[Int],
+  val status: JobExecutionStatus)
+ extends SparkJobInfo
+
+private class SparkStageInfoImpl(
+  val stageId: Int,
+  val currentAttemptId: Int,
+  val name: String,
+  val numTasks: Int,
+  val numActiveTasks: Int,
+  val numCompletedTasks: Int,
+  val numFailedTasks: Int)
+ extends SparkStageInfo

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 791d853..45168ba 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -132,6 +132,25 @@ class JavaSparkContext(val sc: SparkContext)
   /** Default min number of partitions for Hadoop RDDs when not given by user */
   def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions
 
+
+  /**
+   * Return a list of all known jobs in a particular job group.  The returned list may contain
+   * running, failed, and completed jobs, and may vary across invocations of this method.  This
+   * method does not guarantee the order of the elements in its result.
+   */
+  def getJobIdsForGroup(jobGroup: String): Array[Int] = sc.getJobIdsForGroup(jobGroup)
+
+  /**
+   * Returns job information, or `null` if the job info could not be found or was garbage collected.
+   */
+  def getJobInfo(jobId: Int): SparkJobInfo = sc.getJobInfo(jobId).orNull
+
+  /**
+   * Returns stage information, or `null` if the stage info could not be found or was
+   * garbage collected.
+   */
+  def getStageInfo(stageId: Int): SparkStageInfo = sc.getStageInfo(stageId).orNull
+
   /** Distribute a local Scala collection to form an RDD. */
   def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
     implicit val ctag: ClassTag[T] = fakeClassTag

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 481f6c9..2d1609b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -112,7 +112,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
         val ui = {
           val conf = this.conf.clone()
           val appSecManager = new SecurityManager(conf)
-          new SparkUI(conf, appSecManager, replayBus, appId,
+          SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
             s"${HistoryServer.UI_PATH_PREFIX}/$appId")
           // Do not call ui.bind() to avoid creating a new server for each application
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 3b6bb9f..2f81d47 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -721,8 +721,8 @@ private[spark] class Master(
 
     try {
       val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
-      val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)",
-        HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
+      val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
+        appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
       replayBus.replay()
       appIdToUI(app.id) = ui
       webUi.attachSparkUI(ui)

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index cccd59d..049938f 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -21,47 +21,30 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
 import org.apache.spark.scheduler._
 import org.apache.spark.storage.StorageStatusListener
 import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.ui.env.EnvironmentTab
-import org.apache.spark.ui.exec.ExecutorsTab
-import org.apache.spark.ui.jobs.JobProgressTab
-import org.apache.spark.ui.storage.StorageTab
+import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab}
+import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab}
+import org.apache.spark.ui.jobs.{JobProgressListener, JobProgressTab}
+import org.apache.spark.ui.storage.{StorageListener, StorageTab}
 
 /**
  * Top level user interface for a Spark application.
  */
-private[spark] class SparkUI(
-    val sc: SparkContext,
+private[spark] class SparkUI private (
+    val sc: Option[SparkContext],
     val conf: SparkConf,
     val securityManager: SecurityManager,
-    val listenerBus: SparkListenerBus,
+    val environmentListener: EnvironmentListener,
+    val storageStatusListener: StorageStatusListener,
+    val executorsListener: ExecutorsListener,
+    val jobProgressListener: JobProgressListener,
+    val storageListener: StorageListener,
     var appName: String,
-    val basePath: String = "")
+    val basePath: String)
   extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
   with Logging {
 
-  def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName)
-  def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) =
-    this(null, conf, new SecurityManager(conf), listenerBus, appName, basePath)
-
-  def this(
-      conf: SparkConf,
-      securityManager: SecurityManager,
-      listenerBus: SparkListenerBus,
-      appName: String,
-      basePath: String) =
-    this(null, conf, securityManager, listenerBus, appName, basePath)
-
-  // If SparkContext is not provided, assume the associated application is not live
-  val live = sc != null
-
-  // Maintain executor storage status through Spark events
-  val storageStatusListener = new StorageStatusListener
-
-  initialize()
-
   /** Initialize all components of the server. */
   def initialize() {
-    listenerBus.addListener(storageStatusListener)
     val jobProgressTab = new JobProgressTab(this)
     attachTab(jobProgressTab)
     attachTab(new StorageTab(this))
@@ -71,10 +54,10 @@ private[spark] class SparkUI(
     attachHandler(createRedirectHandler("/", "/stages", basePath = basePath))
     attachHandler(
       createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest))
-    if (live) {
-      sc.env.metricsSystem.getServletHandlers.foreach(attachHandler)
-    }
+    // If the UI is live, then serve
+    sc.foreach { _.env.metricsSystem.getServletHandlers.foreach(attachHandler) }
   }
+  initialize()
 
   def getAppName = appName
 
@@ -83,11 +66,6 @@ private[spark] class SparkUI(
     appName = name
   }
 
-  /** Register the given listener with the listener bus. */
-  def registerListener(listener: SparkListener) {
-    listenerBus.addListener(listener)
-  }
-
   /** Stop the server behind this web interface. Only valid after bind(). */
   override def stop() {
     super.stop()
@@ -116,4 +94,60 @@ private[spark] object SparkUI {
   def getUIPort(conf: SparkConf): Int = {
     conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
   }
+
+  def createLiveUI(
+      sc: SparkContext,
+      conf: SparkConf,
+      listenerBus: SparkListenerBus,
+      jobProgressListener: JobProgressListener,
+      securityManager: SecurityManager,
+      appName: String): SparkUI =  {
+    create(Some(sc), conf, listenerBus, securityManager, appName,
+      jobProgressListener = Some(jobProgressListener))
+  }
+
+  def createHistoryUI(
+      conf: SparkConf,
+      listenerBus: SparkListenerBus,
+      securityManager: SecurityManager,
+      appName: String,
+      basePath: String): SparkUI = {
+    create(None, conf, listenerBus, securityManager, appName, basePath)
+  }
+
+  /**
+   * Create a new Spark UI.
+   *
+   * @param sc optional SparkContext; this can be None when reconstituting a UI from event logs.
+   * @param jobProgressListener if supplied, this JobProgressListener will be used; otherwise, the
+   *                            web UI will create and register its own JobProgressListener.
+   */
+  private def create(
+      sc: Option[SparkContext],
+      conf: SparkConf,
+      listenerBus: SparkListenerBus,
+      securityManager: SecurityManager,
+      appName: String,
+      basePath: String = "",
+      jobProgressListener: Option[JobProgressListener] = None): SparkUI = {
+
+    val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
+      val listener = new JobProgressListener(conf)
+      listenerBus.addListener(listener)
+      listener
+    }
+
+    val environmentListener = new EnvironmentListener
+    val storageStatusListener = new StorageStatusListener
+    val executorsListener = new ExecutorsListener(storageStatusListener)
+    val storageListener = new StorageListener(storageStatusListener)
+
+    listenerBus.addListener(environmentListener)
+    listenerBus.addListener(storageStatusListener)
+    listenerBus.addListener(executorsListener)
+    listenerBus.addListener(storageListener)
+
+    new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
+      executorsListener, _jobProgressListener, storageListener, appName, basePath)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
index 0d158fb..f62260c 100644
--- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala
@@ -22,10 +22,8 @@ import org.apache.spark.scheduler._
 import org.apache.spark.ui._
 
 private[ui] class EnvironmentTab(parent: SparkUI) extends SparkUITab(parent, "environment") {
-  val listener = new EnvironmentListener
-
+  val listener = parent.environmentListener
   attachPage(new EnvironmentPage(this))
-  parent.registerListener(listener)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
index 61eb111..689cf02 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
@@ -26,10 +26,9 @@ import org.apache.spark.storage.StorageStatusListener
 import org.apache.spark.ui.{SparkUI, SparkUITab}
 
 private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
-  val listener = new ExecutorsListener(parent.storageStatusListener)
+  val listener = parent.executorsListener
 
   attachPage(new ExecutorsPage(this))
-  parent.registerListener(listener)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index eaeb861..b520736 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -40,17 +40,25 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
 
   import JobProgressListener._
 
+  type JobId = Int
+  type StageId = Int
+  type StageAttemptId = Int
+
   // How many stages to remember
   val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
+  // How many jobs to remember
+  val retailedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)
 
-  // Map from stageId to StageInfo
-  val activeStages = new HashMap[Int, StageInfo]
-
-  // Map from (stageId, attemptId) to StageUIData
-  val stageIdToData = new HashMap[(Int, Int), StageUIData]
+  val activeJobs = new HashMap[JobId, JobUIData]
+  val completedJobs = ListBuffer[JobUIData]()
+  val failedJobs = ListBuffer[JobUIData]()
+  val jobIdToData = new HashMap[JobId, JobUIData]
 
+  val activeStages = new HashMap[StageId, StageInfo]
   val completedStages = ListBuffer[StageInfo]()
   val failedStages = ListBuffer[StageInfo]()
+  val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]
+  val stageIdToInfo = new HashMap[StageId, StageInfo]
 
   // Map from pool name to a hash map (map from stage id to StageInfo).
   val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
@@ -61,8 +69,32 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
 
   def blockManagerIds = executorIdToBlockManagerId.values.toSeq
 
+  override def onJobStart(jobStart: SparkListenerJobStart) = synchronized {
+    val jobGroup = Option(jobStart.properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
+    val jobData: JobUIData =
+      new JobUIData(jobStart.jobId, jobStart.stageIds, jobGroup, JobExecutionStatus.RUNNING)
+    jobIdToData(jobStart.jobId) = jobData
+    activeJobs(jobStart.jobId) = jobData
+  }
+
+  override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized {
+    val jobData = activeJobs.remove(jobEnd.jobId).getOrElse {
+      logWarning(s"Job completed for unknown job ${jobEnd.jobId}")
+      new JobUIData(jobId = jobEnd.jobId)
+    }
+    jobEnd.jobResult match {
+      case JobSucceeded =>
+        completedJobs += jobData
+        jobData.status = JobExecutionStatus.SUCCEEDED
+      case JobFailed(exception) =>
+        failedJobs += jobData
+        jobData.status = JobExecutionStatus.FAILED
+    }
+  }
+
   override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
     val stage = stageCompleted.stageInfo
+    stageIdToInfo(stage.stageId) = stage
     val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), {
       logWarning("Stage completed for unknown stage " + stage.stageId)
       new StageUIData
@@ -89,7 +121,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
   private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
     if (stages.size > retainedStages) {
       val toRemove = math.max(retainedStages / 10, 1)
-      stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, s.attemptId)) }
+      stages.take(toRemove).foreach { s =>
+        stageIdToData.remove((s.stageId, s.attemptId))
+        stageIdToInfo.remove(s.stageId)
+      }
       stages.trimStart(toRemove)
     }
   }
@@ -103,6 +138,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
       p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
     }.getOrElse(DEFAULT_POOL_NAME)
 
+    stageIdToInfo(stage.stageId) = stage
     val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData)
     stageData.schedulingPool = poolName
 
@@ -277,4 +313,5 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
 private object JobProgressListener {
   val DEFAULT_POOL_NAME = "default"
   val DEFAULT_RETAINED_STAGES = 1000
+  val DEFAULT_RETAINED_JOBS = 1000
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
index 1e02f12..6e718ee 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala
@@ -26,7 +26,6 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}
 
 /** Page showing list of all ongoing and recently finished stages and pools */
 private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") {
-  private val live = parent.live
   private val sc = parent.sc
   private val listener = parent.listener
   private def isFairScheduler = parent.isFairScheduler
@@ -47,17 +46,17 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")
         new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
 
       // For now, pool information is only accessible in live UIs
-      val pools = if (live) sc.getAllPools else Seq[Schedulable]()
+      val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable])
       val poolTable = new PoolTable(pools, parent)
 
       val summary: NodeSeq =
         <div>
           <ul class="unstyled">
-            {if (live) {
+            {if (sc.isDefined) {
               // Total duration is not meaningful unless the UI is live
               <li>
                 <strong>Total Duration: </strong>
-                {UIUtils.formatDuration(now - sc.startTime)}
+                {UIUtils.formatDuration(now - sc.get.startTime)}
               </li>
             }}
             <li>
@@ -80,7 +79,7 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")
         </div>
 
       val content = summary ++
-        {if (live && isFairScheduler) {
+        {if (sc.isDefined && isFairScheduler) {
           <h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq
         } else {
           Seq[Node]()

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
index c16542c..03ca918 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala
@@ -25,16 +25,14 @@ import org.apache.spark.ui.{SparkUI, SparkUITab}
 
 /** Web UI showing progress status of all jobs in the given SparkContext. */
 private[ui] class JobProgressTab(parent: SparkUI) extends SparkUITab(parent, "stages") {
-  val live = parent.live
   val sc = parent.sc
-  val conf = if (live) sc.conf else new SparkConf
-  val killEnabled = conf.getBoolean("spark.ui.killEnabled", true)
-  val listener = new JobProgressListener(conf)
+  val conf = sc.map(_.conf).getOrElse(new SparkConf)
+  val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)
+  val listener = parent.jobProgressListener
 
   attachPage(new JobProgressPage(this))
   attachPage(new StagePage(this))
   attachPage(new PoolPage(this))
-  parent.registerListener(listener)
 
   def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
 
@@ -43,7 +41,7 @@ private[ui] class JobProgressTab(parent: SparkUI) extends SparkUITab(parent, "st
       val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
       val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
       if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
-        sc.cancelStage(stageId)
+        sc.get.cancelStage(stageId)
       }
       // Do a quick pause here to give Spark time to kill the stage so it shows up as
       // killed after the refresh. Note that this will block the serving thread so the

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
index 7a6c7d1..770d99e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala
@@ -26,7 +26,6 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}
 
 /** Page showing specific pool details */
 private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") {
-  private val live = parent.live
   private val sc = parent.sc
   private val listener = parent.listener
 
@@ -42,7 +41,7 @@ private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") {
         new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, parent)
 
       // For now, pool information is only accessible in live UIs
-      val pools = if (live) Seq(sc.getPoolForName(poolName).get) else Seq[Schedulable]()
+      val pools = sc.map(_.getPoolForName(poolName).get).toSeq
       val poolTable = new PoolTable(pools, parent)
 
       val content =

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
index a336bf7..e2813f8 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.ui.jobs
 
+import org.apache.spark.JobExecutionStatus
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
 import org.apache.spark.util.collection.OpenHashSet
@@ -36,6 +37,13 @@ private[jobs] object UIData {
     var diskBytesSpilled : Long = 0
   }
 
+  class JobUIData(
+    var jobId: Int = -1,
+    var stageIds: Seq[Int] = Seq.empty,
+    var jobGroup: Option[String] = None,
+    var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN
+  )
+
   class StageUIData {
     var numActiveTasks: Int = _
     var numCompleteTasks: Int = _

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
index 76097f1..a81291d 100644
--- a/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala
@@ -26,11 +26,10 @@ import org.apache.spark.storage._
 
 /** Web UI showing storage status of all RDD's in the given SparkContext. */
 private[ui] class StorageTab(parent: SparkUI) extends SparkUITab(parent, "storage") {
-  val listener = new StorageListener(parent.storageStatusListener)
+  val listener = parent.storageListener
 
   attachPage(new StoragePage(this))
   attachPage(new RDDPage(this))
-  parent.registerListener(listener)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/core/src/test/scala/org/apache/spark/StatusAPISuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/StatusAPISuite.scala b/core/src/test/scala/org/apache/spark/StatusAPISuite.scala
new file mode 100644
index 0000000..4468fba
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/StatusAPISuite.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.concurrent.duration._
+import scala.language.implicitConversions
+import scala.language.postfixOps
+
+import org.scalatest.{Matchers, FunSuite}
+import org.scalatest.concurrent.Eventually._
+
+import org.apache.spark.JobExecutionStatus._
+import org.apache.spark.SparkContext._
+
+class StatusAPISuite extends FunSuite with Matchers with SharedSparkContext {
+
+  test("basic status API usage") {
+    val jobFuture = sc.parallelize(1 to 10000, 2).map(identity).groupBy(identity).collectAsync()
+    val jobId: Int = eventually(timeout(10 seconds)) {
+      val jobIds = jobFuture.jobIds
+      jobIds.size should be(1)
+      jobIds.head
+    }
+    val jobInfo = eventually(timeout(10 seconds)) {
+      sc.getJobInfo(jobId).get
+    }
+    jobInfo.status() should not be FAILED
+    val stageIds = jobInfo.stageIds()
+    stageIds.size should be(2)
+
+    val firstStageInfo = eventually(timeout(10 seconds)) {
+      sc.getStageInfo(stageIds(0)).get
+    }
+    firstStageInfo.stageId() should be(stageIds(0))
+    firstStageInfo.currentAttemptId() should be(0)
+    firstStageInfo.numTasks() should be(2)
+    eventually(timeout(10 seconds)) {
+      val updatedFirstStageInfo = sc.getStageInfo(stageIds(0)).get
+      updatedFirstStageInfo.numCompletedTasks() should be(2)
+      updatedFirstStageInfo.numActiveTasks() should be(0)
+      updatedFirstStageInfo.numFailedTasks() should be(0)
+    }
+  }
+
+  test("getJobIdsForGroup()") {
+    sc.setJobGroup("my-job-group", "description")
+    sc.getJobIdsForGroup("my-job-group") should be (Seq.empty)
+    val firstJobFuture = sc.parallelize(1 to 1000).countAsync()
+    val firstJobId = eventually(timeout(10 seconds)) {
+      firstJobFuture.jobIds.head
+    }
+    eventually(timeout(10 seconds)) {
+      sc.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId))
+    }
+    val secondJobFuture = sc.parallelize(1 to 1000).countAsync()
+    val secondJobId = eventually(timeout(10 seconds)) {
+      secondJobFuture.jobIds.head
+    }
+    eventually(timeout(10 seconds)) {
+      sc.getJobIdsForGroup("my-job-group").toSet should be (Set(firstJobId, secondJobId))
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 66738d3..3007706 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -375,7 +375,16 @@ Apart from these, the following properties are also available, and may be useful
   <td><code>spark.ui.retainedStages</code></td>
   <td>1000</td>
   <td>
-    How many stages the Spark UI remembers before garbage collecting.
+    How many stages the Spark UI and status APIs remember before garbage
+    collecting.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.ui.retainedJobs</code></td>
+  <td>1000</td>
+  <td>
+    How many stages the Spark UI and status APIs remember before garbage
+    collecting.
   </td>
 </tr>
 <tr>

http://git-wip-us.apache.org/repos/asf/spark/blob/95303168/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java
new file mode 100644
index 0000000..430e96a
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/JavaStatusAPIDemo.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkJobInfo;
+import org.apache.spark.SparkStageInfo;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Example of using Spark's status APIs from Java.
+ */
+public final class JavaStatusAPIDemo {
+
+  public static final String APP_NAME = "JavaStatusAPIDemo";
+
+  public static final class IdentityWithDelay<T> implements Function<T, T> {
+    @Override
+    public T call(T x) throws Exception {
+      Thread.sleep(2 * 1000);  // 2 seconds
+      return x;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    SparkConf sparkConf = new SparkConf().setAppName(APP_NAME);
+    final JavaSparkContext sc = new JavaSparkContext(sparkConf);
+
+    // Example of implementing a progress reporter for a simple job.
+    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 5).map(
+        new IdentityWithDelay<Integer>());
+    JavaFutureAction<List<Integer>> jobFuture = rdd.collectAsync();
+    while (!jobFuture.isDone()) {
+      Thread.sleep(1000);  // 1 second
+      List<Integer> jobIds = jobFuture.jobIds();
+      if (jobIds.isEmpty()) {
+        continue;
+      }
+      int currentJobId = jobIds.get(jobIds.size() - 1);
+      SparkJobInfo jobInfo = sc.getJobInfo(currentJobId);
+      SparkStageInfo stageInfo = sc.getStageInfo(jobInfo.stageIds()[0]);
+      System.out.println(stageInfo.numTasks() + " tasks total: " + stageInfo.numActiveTasks() +
+          " active, " + stageInfo.numCompletedTasks() + " complete");
+    }
+
+    System.out.println("Job results are: " + jobFuture.get());
+    sc.stop();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org