You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/09/03 23:57:59 UTC

git commit: [SPARK-3388] Expose aplication ID in ApplicationStart event, use it in history server.

Repository: spark
Updated Branches:
  refs/heads/master ccc69e26e -> f2b5b619a


[SPARK-3388] Expose aplication ID in ApplicationStart event, use it in history server.

This change exposes the application ID generated by the Spark Master, Mesos or Yarn
via the SparkListenerApplicationStart event. It then uses that information to expose the
application via its ID in the history server, instead of using the internal directory name
generated by the event logger as an application id. This allows someone who knows
the application ID to easily figure out the URL for the application's entry in the HS, aside
from looking better.

In Yarn mode, this is used to generate a direct link from the RM application list to the
Spark history server entry (thus providing a fix for SPARK-2150).

Note this sort of assumes that the different managers will generate app ids that are
sufficiently different from each other that clashes will not occur.

Author: Marcelo Vanzin <va...@cloudera.com>

This patch had conflicts when merged, resolved by
Committer: Andrew Or <an...@gmail.com>

Closes #1218 from vanzin/yarn-hs-link-2 and squashes the following commits:

2d19f3c [Marcelo Vanzin] Review feedback.
6706d3a [Marcelo Vanzin] Implement applicationId() in base classes.
56fe42e [Marcelo Vanzin] Fix cluster mode history address, plus a cleanup.
44112a8 [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2
8278316 [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2
a86bbcf [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2
a0056e6 [Marcelo Vanzin] Unbreak test.
4b10cfd [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2
cb0cab2 [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2
25f2826 [Marcelo Vanzin] Add MIMA excludes.
f0ba90f [Marcelo Vanzin] Use BufferedIterator.
c90a08d [Marcelo Vanzin] Remove unused code.
3f8ec66 [Marcelo Vanzin] Review feedback.
21aa71b [Marcelo Vanzin] Fix JSON test.
b022bae [Marcelo Vanzin] Undo SparkContext cleanup.
c6d7478 [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2
4e3483f [Marcelo Vanzin] Fix test.
57517b8 [Marcelo Vanzin] Review feedback. Mostly, more consistent use of Scala's Option.
311e49d [Marcelo Vanzin] Merge branch 'master' into yarn-hs-link-2
d35d86f [Marcelo Vanzin] Fix yarn backend after rebase.
36dc362 [Marcelo Vanzin] Don't use Iterator::takeWhile().
0afd696 [Marcelo Vanzin] Wait until master responds before returning from start().
abc4697 [Marcelo Vanzin] Make FsHistoryProvider keep a map of applications by id.
26b266e [Marcelo Vanzin] Use Mesos framework ID as Spark application ID.
b3f3664 [Marcelo Vanzin] [yarn] Make the RM link point to the app direcly in the HS.
2fb7de4 [Marcelo Vanzin] Expose the application ID in the ApplicationStart event.
ed10348 [Marcelo Vanzin] Expose application id to spark context.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f2b5b619
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f2b5b619
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f2b5b619

Branch: refs/heads/master
Commit: f2b5b619a9efee91573c0e546792e68e72afce21
Parents: ccc69e2
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Wed Sep 3 14:57:38 2014 -0700
Committer: Andrew Or <an...@gmail.com>
Committed: Wed Sep 3 14:57:38 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |   5 +-
 .../history/ApplicationHistoryProvider.scala    |   6 +-
 .../deploy/history/FsHistoryProvider.scala      | 176 +++++++++++--------
 .../spark/deploy/history/HistoryServer.scala    |   5 +-
 .../scheduler/ApplicationEventListener.scala    |  35 ++--
 .../spark/scheduler/SchedulerBackend.scala      |   8 +
 .../apache/spark/scheduler/SparkListener.scala  |   4 +-
 .../apache/spark/scheduler/TaskScheduler.scala  |   8 +
 .../spark/scheduler/TaskSchedulerImpl.scala     |   4 +
 .../cluster/CoarseGrainedSchedulerBackend.scala |   4 +-
 .../cluster/SimrSchedulerBackend.scala          |   1 +
 .../cluster/SparkDeploySchedulerBackend.scala   |  28 +++
 .../mesos/CoarseMesosSchedulerBackend.scala     |   1 +
 .../cluster/mesos/MesosSchedulerBackend.scala   |   1 +
 .../spark/scheduler/local/LocalBackend.scala    |   1 +
 .../scala/org/apache/spark/ui/UIUtils.scala     |   2 +-
 .../org/apache/spark/util/JsonProtocol.scala    |   4 +-
 .../scheduler/EventLoggingListenerSuite.scala   |   3 +-
 .../spark/scheduler/ReplayListenerSuite.scala   |   3 +-
 .../apache/spark/util/JsonProtocolSuite.scala   |   9 +-
 project/MimaExcludes.scala                      |   6 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |  24 ++-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  14 --
 .../cluster/YarnClientSchedulerBackend.scala    |   4 +-
 .../cluster/YarnClusterSchedulerBackend.scala   |   5 +-
 25 files changed, 228 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cb4fb7c..529febf 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1261,7 +1261,10 @@ class SparkContext(config: SparkConf) extends Logging {
 
   /** Post the application start event */
   private def postApplicationStart() {
-    listenerBus.post(SparkListenerApplicationStart(appName, startTime, sparkUser))
+    // Note: this code assumes that the task scheduler has been initialized and has contacted
+    // the cluster manager to get an application ID (in case the cluster manager provides one).
+    listenerBus.post(SparkListenerApplicationStart(appName, taskScheduler.applicationId(),
+      startTime, sparkUser))
   }
 
   /** Post the application end event */

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index a0e8bd4..fbe39b2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -34,15 +34,15 @@ private[spark] abstract class ApplicationHistoryProvider {
    *
    * @return List of all know applications.
    */
-  def getListing(): Seq[ApplicationHistoryInfo]
+  def getListing(): Iterable[ApplicationHistoryInfo]
 
   /**
    * Returns the Spark UI for a specific application.
    *
    * @param appId The application ID.
-   * @return The application's UI, or null if application is not found.
+   * @return The application's UI, or None if application is not found.
    */
-  def getAppUI(appId: String): SparkUI
+  def getAppUI(appId: String): Option[SparkUI]
 
   /**
    * Called when the server is shutting down.

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 05c8a90..481f6c9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -32,6 +32,8 @@ import org.apache.spark.util.Utils
 private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
   with Logging {
 
+  private val NOT_STARTED = "<Not Started>"
+
   // Interval between each check for event log updates
   private val UPDATE_INTERVAL_MS = conf.getInt("spark.history.fs.updateInterval",
     conf.getInt("spark.history.updateInterval", 10)) * 1000
@@ -47,8 +49,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
   // A timestamp of when the disk was last accessed to check for log updates
   private var lastLogCheckTimeMs = -1L
 
-  // List of applications, in order from newest to oldest.
-  @volatile private var appList: Seq[ApplicationHistoryInfo] = Nil
+  // The modification time of the newest log detected during the last scan. This is used
+  // to ignore logs that are older during subsequent scans, to avoid processing data that
+  // is already known.
+  private var lastModifiedTime = -1L
+
+  // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
+  // into the map in order, so the LinkedHashMap maintains the correct ordering.
+  @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
+    = new mutable.LinkedHashMap()
 
   /**
    * A background thread that periodically checks for event log updates on disk.
@@ -93,15 +102,35 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
     logCheckingThread.start()
   }
 
-  override def getListing() = appList
+  override def getListing() = applications.values
 
-  override def getAppUI(appId: String): SparkUI = {
+  override def getAppUI(appId: String): Option[SparkUI] = {
     try {
-      val appLogDir = fs.getFileStatus(new Path(resolvedLogDir.toString, appId))
-      val (_, ui) = loadAppInfo(appLogDir, renderUI = true)
-      ui
+      applications.get(appId).map { info =>
+        val (replayBus, appListener) = createReplayBus(fs.getFileStatus(
+          new Path(logDir, info.logDir)))
+        val ui = {
+          val conf = this.conf.clone()
+          val appSecManager = new SecurityManager(conf)
+          new SparkUI(conf, appSecManager, replayBus, appId,
+            s"${HistoryServer.UI_PATH_PREFIX}/$appId")
+          // Do not call ui.bind() to avoid creating a new server for each application
+        }
+
+        replayBus.replay()
+
+        ui.setAppName(s"${appListener.appName.getOrElse(NOT_STARTED)} ($appId)")
+
+        val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
+        ui.getSecurityManager.setAcls(uiAclsEnabled)
+        // make sure to set admin acls before view acls so they are properly picked up
+        ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
+        ui.getSecurityManager.setViewAcls(appListener.sparkUser.getOrElse(NOT_STARTED),
+          appListener.viewAcls.getOrElse(""))
+        ui
+      }
     } catch {
-      case e: FileNotFoundException => null
+      case e: FileNotFoundException => None
     }
   }
 
@@ -119,84 +148,79 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
     try {
       val logStatus = fs.listStatus(new Path(resolvedLogDir))
       val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
-      val logInfos = logDirs.filter { dir =>
-        fs.isFile(new Path(dir.getPath, EventLoggingListener.APPLICATION_COMPLETE))
-      }
 
-      val currentApps = Map[String, ApplicationHistoryInfo](
-        appList.map(app => app.id -> app):_*)
-
-      // For any application that either (i) is not listed or (ii) has changed since the last time
-      // the listing was created (defined by the log dir's modification time), load the app's info.
-      // Otherwise just reuse what's already in memory.
-      val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo](logInfos.size)
-      for (dir <- logInfos) {
-        val curr = currentApps.getOrElse(dir.getPath().getName(), null)
-        if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
+      // Load all new logs from the log directory. Only directories that have a modification time
+      // later than the last known log directory will be loaded.
+      var newLastModifiedTime = lastModifiedTime
+      val logInfos = logDirs
+        .filter { dir =>
+          if (fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) {
+            val modTime = getModificationTime(dir)
+            newLastModifiedTime = math.max(newLastModifiedTime, modTime)
+            modTime > lastModifiedTime
+          } else {
+            false
+          }
+        }
+        .flatMap { dir =>
           try {
-            val (app, _) = loadAppInfo(dir, renderUI = false)
-            newApps += app
+            val (replayBus, appListener) = createReplayBus(dir)
+            replayBus.replay()
+            Some(new FsApplicationHistoryInfo(
+              dir.getPath().getName(),
+              appListener.appId.getOrElse(dir.getPath().getName()),
+              appListener.appName.getOrElse(NOT_STARTED),
+              appListener.startTime.getOrElse(-1L),
+              appListener.endTime.getOrElse(-1L),
+              getModificationTime(dir),
+              appListener.sparkUser.getOrElse(NOT_STARTED)))
           } catch {
-            case e: Exception => logError(s"Failed to load app info from directory $dir.")
+            case e: Exception =>
+              logInfo(s"Failed to load application log data from $dir.", e)
+              None
+          }
+        }
+        .sortBy { info => -info.endTime }
+
+      lastModifiedTime = newLastModifiedTime
+
+      // When there are new logs, merge the new list with the existing one, maintaining
+      // the expected ordering (descending end time). Maintaining the order is important
+      // to avoid having to sort the list every time there is a request for the log list.
+      if (!logInfos.isEmpty) {
+        val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
+        def addIfAbsent(info: FsApplicationHistoryInfo) = {
+          if (!newApps.contains(info.id)) {
+            newApps += (info.id -> info)
           }
-        } else {
-          newApps += curr
         }
-      }
 
-      appList = newApps.sortBy { info => -info.endTime }
+        val newIterator = logInfos.iterator.buffered
+        val oldIterator = applications.values.iterator.buffered
+        while (newIterator.hasNext && oldIterator.hasNext) {
+          if (newIterator.head.endTime > oldIterator.head.endTime) {
+            addIfAbsent(newIterator.next)
+          } else {
+            addIfAbsent(oldIterator.next)
+          }
+        }
+        newIterator.foreach(addIfAbsent)
+        oldIterator.foreach(addIfAbsent)
+
+        applications = newApps
+      }
     } catch {
       case t: Throwable => logError("Exception in checking for event log updates", t)
     }
   }
 
-  /**
-   * Parse the application's logs to find out the information we need to build the
-   * listing page.
-   *
-   * When creating the listing of available apps, there is no need to load the whole UI for the
-   * application. The UI is requested by the HistoryServer (by calling getAppInfo()) when the user
-   * clicks on a specific application.
-   *
-   * @param logDir Directory with application's log files.
-   * @param renderUI Whether to create the SparkUI for the application.
-   * @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
-   */
-  private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = {
-    val path = logDir.getPath
-    val appId = path.getName
+  private def createReplayBus(logDir: FileStatus): (ReplayListenerBus, ApplicationEventListener) = {
+    val path = logDir.getPath()
     val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
     val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
     val appListener = new ApplicationEventListener
     replayBus.addListener(appListener)
-
-    val ui: SparkUI = if (renderUI) {
-        val conf = this.conf.clone()
-        val appSecManager = new SecurityManager(conf)
-        new SparkUI(conf, appSecManager, replayBus, appId,
-          HistoryServer.UI_PATH_PREFIX + s"/$appId")
-        // Do not call ui.bind() to avoid creating a new server for each application
-      } else {
-        null
-      }
-
-    replayBus.replay()
-    val appInfo = ApplicationHistoryInfo(
-      appId,
-      appListener.appName,
-      appListener.startTime,
-      appListener.endTime,
-      getModificationTime(logDir),
-      appListener.sparkUser)
-
-    if (ui != null) {
-      val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
-      ui.getSecurityManager.setAcls(uiAclsEnabled)
-      // make sure to set admin acls before view acls so properly picked up
-      ui.getSecurityManager.setAdminAcls(appListener.adminAcls)
-      ui.getSecurityManager.setViewAcls(appListener.sparkUser, appListener.viewAcls)
-    }
-    (appInfo, ui)
+    (replayBus, appListener)
   }
 
   /** Return when this directory was last modified. */
@@ -219,3 +243,13 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
   private def getMonotonicTimeMs() = System.nanoTime() / (1000 * 1000)
 
 }
+
+private class FsApplicationHistoryInfo(
+    val logDir: String,
+    id: String,
+    name: String,
+    startTime: Long,
+    endTime: Long,
+    lastUpdated: Long,
+    sparkUser: String)
+  extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser)

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index d1a64c1..ce00c0f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -52,10 +52,7 @@ class HistoryServer(
 
   private val appLoader = new CacheLoader[String, SparkUI] {
     override def load(key: String): SparkUI = {
-      val ui = provider.getAppUI(key)
-      if (ui == null) {
-        throw new NoSuchElementException()
-      }
+      val ui = provider.getAppUI(key).getOrElse(throw new NoSuchElementException())
       attachSparkUI(ui)
       ui
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
index 162158b..6d39a5e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
@@ -24,38 +24,31 @@ package org.apache.spark.scheduler
  * from multiple applications are seen, the behavior is unspecified.
  */
 private[spark] class ApplicationEventListener extends SparkListener {
-  var appName = "<Not Started>"
-  var sparkUser = "<Not Started>"
-  var startTime = -1L
-  var endTime = -1L
-  var viewAcls = ""
-  var adminAcls = ""
-
-  def applicationStarted = startTime != -1
-
-  def applicationCompleted = endTime != -1
-
-  def applicationDuration: Long = {
-    val difference = endTime - startTime
-    if (applicationStarted && applicationCompleted && difference > 0) difference else -1L
-  }
+  var appName: Option[String] = None
+  var appId: Option[String] = None
+  var sparkUser: Option[String] = None
+  var startTime: Option[Long] = None
+  var endTime: Option[Long] = None
+  var viewAcls: Option[String] = None
+  var adminAcls: Option[String] = None
 
   override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
-    appName = applicationStart.appName
-    startTime = applicationStart.time
-    sparkUser = applicationStart.sparkUser
+    appName = Some(applicationStart.appName)
+    appId = applicationStart.appId
+    startTime = Some(applicationStart.time)
+    sparkUser = Some(applicationStart.sparkUser)
   }
 
   override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
-    endTime = applicationEnd.time
+    endTime = Some(applicationEnd.time)
   }
 
   override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
     synchronized {
       val environmentDetails = environmentUpdate.environmentDetails
       val allProperties = environmentDetails("Spark Properties").toMap
-      viewAcls = allProperties.getOrElse("spark.ui.view.acls", "")
-      adminAcls = allProperties.getOrElse("spark.admin.acls", "")
+      viewAcls = allProperties.get("spark.ui.view.acls")
+      adminAcls = allProperties.get("spark.admin.acls")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index e41e0a9..a0be830 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -31,4 +31,12 @@ private[spark] trait SchedulerBackend {
   def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
     throw new UnsupportedOperationException
   def isReady(): Boolean = true
+
+  /**
+   * The application ID associated with the job, if any.
+   *
+   * @return The application ID, or None if the backend does not provide an ID.
+   */
+  def applicationId(): Option[String] = None
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index f33c2e0..86afe3b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -89,8 +89,8 @@ case class SparkListenerExecutorMetricsUpdate(
   extends SparkListenerEvent
 
 @DeveloperApi
-case class SparkListenerApplicationStart(appName: String, time: Long, sparkUser: String)
-  extends SparkListenerEvent
+case class SparkListenerApplicationStart(appName: String, appId: Option[String], time: Long,
+  sparkUser: String) extends SparkListenerEvent
 
 @DeveloperApi
 case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index 1a0b877..1c1ce66 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -64,4 +64,12 @@ private[spark] trait TaskScheduler {
    */
   def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
     blockManagerId: BlockManagerId): Boolean
+
+  /**
+   * The application ID associated with the job, if any.
+   *
+   * @return The application ID, or None if the backend does not provide an ID.
+   */
+  def applicationId(): Option[String] = None
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index ad051e5..633e892 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -491,6 +491,9 @@ private[spark] class TaskSchedulerImpl(
       }
     }
   }
+
+  override def applicationId(): Option[String] = backend.applicationId()
+
 }
 
 
@@ -535,4 +538,5 @@ private[spark] object TaskSchedulerImpl {
 
     retval.toList
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 2a3711a..5b52572 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -51,12 +51,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
   val conf = scheduler.sc.conf
   private val timeout = AkkaUtils.askTimeout(conf)
   private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
-  // Submit tasks only after (registered resources / total expected resources) 
+  // Submit tasks only after (registered resources / total expected resources)
   // is equal to at least this value, that is double between 0 and 1.
   var minRegisteredRatio =
     math.min(1, conf.getDouble("spark.scheduler.minRegisteredResourcesRatio", 0))
   // Submit tasks after maxRegisteredWaitingTime milliseconds
-  // if minRegisteredRatio has not yet been reached  
+  // if minRegisteredRatio has not yet been reached
   val maxRegisteredWaitingTime =
     conf.getInt("spark.scheduler.maxRegisteredResourcesWaitingTime", 30000)
   val createTime = System.currentTimeMillis()

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
index bc7670f..513d74a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -69,4 +69,5 @@ private[spark] class SimrSchedulerBackend(
     fs.delete(new Path(driverFilePath), false)
     super.stop()
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 32138e5..06872ac 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -34,6 +34,10 @@ private[spark] class SparkDeploySchedulerBackend(
   var client: AppClient = null
   var stopping = false
   var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _
+  var appId: String = _
+
+  val registrationLock = new Object()
+  var registrationDone = false
 
   val maxCores = conf.getOption("spark.cores.max").map(_.toInt)
   val totalExpectedCores = maxCores.getOrElse(0)
@@ -68,6 +72,8 @@ private[spark] class SparkDeploySchedulerBackend(
 
     client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
     client.start()
+
+    waitForRegistration()
   }
 
   override def stop() {
@@ -81,15 +87,19 @@ private[spark] class SparkDeploySchedulerBackend(
 
   override def connected(appId: String) {
     logInfo("Connected to Spark cluster with app ID " + appId)
+    this.appId = appId
+    notifyContext()
   }
 
   override def disconnected() {
+    notifyContext()
     if (!stopping) {
       logWarning("Disconnected from Spark cluster! Waiting for reconnection...")
     }
   }
 
   override def dead(reason: String) {
+    notifyContext()
     if (!stopping) {
       logError("Application has been killed. Reason: " + reason)
       scheduler.error(reason)
@@ -116,4 +126,22 @@ private[spark] class SparkDeploySchedulerBackend(
   override def sufficientResourcesRegistered(): Boolean = {
     totalCoreCount.get() >= totalExpectedCores * minRegisteredRatio
   }
+
+  override def applicationId(): Option[String] = Option(appId)
+
+  private def waitForRegistration() = {
+    registrationLock.synchronized {
+      while (!registrationDone) {
+        registrationLock.wait()
+      }
+    }
+  }
+
+  private def notifyContext() = {
+    registrationLock.synchronized {
+      registrationDone = true
+      registrationLock.notifyAll()
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 87e181e..da43ef5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -309,4 +309,5 @@ private[spark] class CoarseMesosSchedulerBackend(
     logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
     slaveLost(d, s)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 67ee4d6..a9ef126 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -349,4 +349,5 @@ private[spark] class MesosSchedulerBackend(
 
   // TODO: query Mesos for number of cores
   override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8)
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index bec9502..9ea25c2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -114,4 +114,5 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
   override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
     localActor ! StatusUpdate(taskId, state, serializedData)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index bee6dad..f0006b4 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -232,7 +232,7 @@ private[spark] object UIUtils extends Logging {
   def listingTable[T](
       headers: Seq[String],
       generateDataRow: T => Seq[Node],
-      data: Seq[T],
+      data: Iterable[T],
       fixedWidth: Boolean = false): Seq[Node] = {
 
     var listingTableClass = TABLE_CLASS

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 1fc536b..b0754e3 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -171,6 +171,7 @@ private[spark] object JsonProtocol {
   def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = {
     ("Event" -> Utils.getFormattedClassName(applicationStart)) ~
     ("App Name" -> applicationStart.appName) ~
+    ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
     ("Timestamp" -> applicationStart.time) ~
     ("User" -> applicationStart.sparkUser)
   }
@@ -484,9 +485,10 @@ private[spark] object JsonProtocol {
 
   def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = {
     val appName = (json \ "App Name").extract[String]
+    val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String])
     val time = (json \ "Timestamp").extract[Long]
     val sparkUser = (json \ "User").extract[String]
-    SparkListenerApplicationStart(appName, time, sparkUser)
+    SparkListenerApplicationStart(appName, appId, time, sparkUser)
   }
 
   def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 41e58a0..fead883 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -229,7 +229,8 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
     val conf = getLoggingConf(logDirPath, compressionCodec)
     val eventLogger = new EventLoggingListener("test", conf)
     val listenerBus = new LiveListenerBus
-    val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey")
+    val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
+      125L, "Mickey")
     val applicationEnd = SparkListenerApplicationEnd(1000L)
 
     // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 8f0ee9f..7ab351d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -83,7 +83,8 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
     val fstream = fileSystem.create(logFilePath)
     val cstream = codec.map(_.compressedOutputStream(fstream)).getOrElse(fstream)
     val writer = new PrintWriter(cstream)
-    val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", 125L, "Mickey")
+    val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
+      125L, "Mickey")
     val applicationEnd = SparkListenerApplicationEnd(1000L)
     writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
     writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index c84bafc..2b45d8b 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -60,7 +60,7 @@ class JsonProtocolSuite extends FunSuite {
     val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
       BlockManagerId("Scarce", "to be counted...", 100))
     val unpersistRdd = SparkListenerUnpersistRDD(12345)
-    val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield")
+    val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
     val applicationEnd = SparkListenerApplicationEnd(42L)
 
     testEvent(stageSubmitted, stageSubmittedJsonString)
@@ -176,6 +176,13 @@ class JsonProtocolSuite extends FunSuite {
       deserializedBmRemoved)
   }
 
+  test("SparkListenerApplicationStart backwards compatibility") {
+    // SparkListenerApplicationStart in Spark 1.0.0 do not have an "appId" property.
+    val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user")
+    val oldEvent = JsonProtocol.applicationStartToJson(applicationStart)
+      .removeField({ _._1 == "App ID" })
+    assert(applicationStart === JsonProtocol.applicationStartFromJson(oldEvent))
+  }
 
   /** -------------------------- *
    | Helper test running methods |

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index a2f1b35..855d5cc 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -111,6 +111,8 @@ object MimaExcludes {
           MimaBuild.excludeSparkClass("storage.Values") ++
           MimaBuild.excludeSparkClass("storage.Entry") ++
           MimaBuild.excludeSparkClass("storage.MemoryStore$Entry") ++
+          // Class was missing "@DeveloperApi" annotation in 1.0.
+          MimaBuild.excludeSparkClass("scheduler.SparkListenerApplicationStart") ++
           Seq(
             ProblemFilters.exclude[IncompatibleMethTypeProblem](
               "org.apache.spark.mllib.tree.impurity.Gini.calculate"),
@@ -119,14 +121,14 @@ object MimaExcludes {
             ProblemFilters.exclude[IncompatibleMethTypeProblem](
               "org.apache.spark.mllib.tree.impurity.Variance.calculate")
           ) ++
-          Seq ( // Package-private classes removed in SPARK-2341
+          Seq( // Package-private classes removed in SPARK-2341
             ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.BinaryLabelParser"),
             ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.BinaryLabelParser$"),
             ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.LabelParser"),
             ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.LabelParser$"),
             ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser"),
             ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser$")
-          ) ++ 
+          ) ++
           Seq( // package-private classes removed in MLlib
             ProblemFilters.exclude[MissingMethodProblem](
               "org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.org$apache$spark$mllib$regression$GeneralizedLinearAlgorithm$$prependOne")

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 8c54840..98039a2 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv}
 import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
 import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
@@ -70,6 +71,8 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
   private val sparkContextRef = new AtomicReference[SparkContext](null)
 
   final def run(): Int = {
+    val appAttemptId = client.getAttemptId()
+
     if (isDriver) {
       // Set the web ui port to be ephemeral for yarn so we don't conflict with
       // other spark processes running on the same box
@@ -77,9 +80,12 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
 
       // Set the master property to match the requested mode.
       System.setProperty("spark.master", "yarn-cluster")
+
+      // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
+      System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
     }
 
-    logInfo("ApplicationAttemptId: " + client.getAttemptId())
+    logInfo("ApplicationAttemptId: " + appAttemptId)
 
     val cleanupHook = new Runnable {
       override def run() {
@@ -151,13 +157,20 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
     sparkContextRef.compareAndSet(sc, null)
   }
 
-  private def registerAM(uiAddress: String, uiHistoryAddress: String) = {
+  private def registerAM(uiAddress: String) = {
     val sc = sparkContextRef.get()
+
+    val appId = client.getAttemptId().getApplicationId().toString()
+    val historyAddress =
+      sparkConf.getOption("spark.yarn.historyServer.address")
+        .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" }
+        .getOrElse("")
+
     allocator = client.register(yarnConf,
       if (sc != null) sc.getConf else sparkConf,
       if (sc != null) sc.preferredNodeLocationData else Map(),
       uiAddress,
-      uiHistoryAddress)
+      historyAddress)
 
     allocator.allocateResources()
     reporterThread = launchReporterThread()
@@ -175,7 +188,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
     if (sc == null) {
       finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
     } else {
-      registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
+      registerAM(sc.ui.appUIHostPort)
       try {
         userThread.join()
       } finally {
@@ -190,8 +203,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
       conf = sparkConf, securityManager = securityMgr)._1
     actor = waitForSparkDriver()
     addAmIpFilter()
-    registerAM(sparkConf.get("spark.driver.appUIAddress", ""),
-      sparkConf.get("spark.driver.appUIHistoryAddress", ""))
+    registerAM(sparkConf.get("spark.driver.appUIAddress", ""))
 
     // In client mode the actor will stop the reporter thread.
     reporterThread.join()

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index ffe2731..dc77f12 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.util.RackResolver
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.deploy.history.HistoryServer
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.Utils
 
@@ -156,19 +155,6 @@ object YarnSparkHadoopUtil {
     }
   }
 
-  def getUIHistoryAddress(sc: SparkContext, conf: SparkConf) : String = {
-    val eventLogDir = sc.eventLogger match {
-      case Some(logger) => logger.getApplicationLogDir()
-      case None => ""
-    }
-    val historyServerAddress = conf.get("spark.yarn.historyServer.address", "")
-    if (historyServerAddress != "" && eventLogDir != "") {
-      historyServerAddress + HistoryServer.UI_PATH_PREFIX + s"/$eventLogDir"
-    } else {
-      ""
-    }
-  }
-
   /**
    * Escapes a string for inclusion in a command line executed by Yarn. Yarn executes commands
    * using `bash -c "command arg1 arg2"` and that means plain quoting doesn't really work. The

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index a5f537d..41c662c 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -56,7 +56,6 @@ private[spark] class YarnClientSchedulerBackend(
     val driverPort = conf.get("spark.driver.port")
     val hostport = driverHost + ":" + driverPort
     conf.set("spark.driver.appUIAddress", sc.ui.appUIHostPort)
-    conf.set("spark.driver.appUIHistoryAddress", YarnSparkHadoopUtil.getUIHistoryAddress(sc, conf))
 
     val argsArrayBuf = new ArrayBuffer[String]()
     argsArrayBuf += (
@@ -150,4 +149,7 @@ private[spark] class YarnClientSchedulerBackend(
   override def sufficientResourcesRegistered(): Boolean = {
     totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
   }
+
+  override def applicationId(): Option[String] = Option(appId).map(_.toString())
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f2b5b619/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 5566522..39436d0 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -28,7 +28,7 @@ private[spark] class YarnClusterSchedulerBackend(
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
 
   var totalExpectedExecutors = 0
-  
+
   if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
     minRegisteredRatio = 0.8
   }
@@ -47,4 +47,7 @@ private[spark] class YarnClusterSchedulerBackend(
   override def sufficientResourcesRegistered(): Boolean = {
     totalRegisteredExecutors.get() >= totalExpectedExecutors * minRegisteredRatio
   }
+
+  override def applicationId(): Option[String] = sc.getConf.getOption("spark.yarn.app.id")
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org