You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2015/05/01 16:51:00 UTC

spark git commit: [SPARK-4705] Handle multiple app attempts event logs, history server.

Repository: spark
Updated Branches:
  refs/heads/master 7fe0f3f2b -> 3052f4916


[SPARK-4705] Handle multiple app attempts event logs, history server.

This change modifies the event logging listener to write the logs for different application
attempts to different files. The attempt ID is set by the scheduler backend, so as long
as the backend returns that ID to SparkContext, things should work. Currently, the
YARN backend does that.

The history server was also modified to model multiple attempts per application. Each
attempt has its own UI and a separate row in the listing table, so that users can look at
all the attempts separately. The UI "adapts" itself to avoid showing attempt-specific info
when all the applications being shown have a single attempt.

Author: Marcelo Vanzin <va...@cloudera.com>
Author: twinkle sachdeva <tw...@kite.ggn.in.guavus.com>
Author: twinkle.sachdeva <tw...@guavus.com>
Author: twinkle sachdeva <tw...@guavus.com>

Closes #5432 from vanzin/SPARK-4705 and squashes the following commits:

7e289fa [Marcelo Vanzin] Review feedback.
f66dcc5 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705
bc885b7 [Marcelo Vanzin] Review feedback.
76a3651 [Marcelo Vanzin] Fix log cleaner, add test.
7c381ec [Marcelo Vanzin] Merge branch 'master' into SPARK-4705
1aa309d [Marcelo Vanzin] Improve sorting of app attempts.
2ad77e7 [Marcelo Vanzin] Missed a reference to the old property name.
9d59d92 [Marcelo Vanzin] Scalastyle...
d5a9c37 [Marcelo Vanzin] Update JsonProtocol test, make property name consistent.
ba34b69 [Marcelo Vanzin] Use Option[String] for attempt id.
f1cb9b3 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705
c14ec19 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705
9092d39 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705
86de638 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705
07446c6 [Marcelo Vanzin] Disable striping for app id / name when multiple attempts exist.
9092af5 [Marcelo Vanzin] Fix HistoryServer test.
3a14503 [Marcelo Vanzin] Argh scalastyle.
657ec18 [Marcelo Vanzin] Fix yarn history URL, app links.
c3e0a82 [Marcelo Vanzin] Move app name to app info, more UI fixes.
ce5ee5d [Marcelo Vanzin] Misc UI, test, style fixes.
cbe8bba [Marcelo Vanzin] Attempt ID in listener event should be an option.
88b1de8 [Marcelo Vanzin] Add a test for apps with multiple attempts.
3245aa2 [Marcelo Vanzin] Make app attempts part of the history server model.
5fd5c6f [Marcelo Vanzin] Fix my broken rebase.
318525a [twinkle.sachdeva] SPARK-4705: 1) moved from directory structure to single file, as per the master branch. 2) Added the attempt id inside the SparkListenerApplicationStart, to make the info available independent of directory structure. 3) Changes in History Server to render the UI as per the snaphot II
6b2e521 [twinkle sachdeva] SPARK-4705 Incorporating the review comments regarding formatting, will do the rest of the changes after this
4c1fc26 [twinkle sachdeva] SPARK-4705 Incorporating the review comments regarding formatting, will do the rest of the changes after this
0eb7722 [twinkle sachdeva] SPARK-4705: Doing cherry-pick of fix into master


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3052f491
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3052f491
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3052f491

Branch: refs/heads/master
Commit: 3052f4916e7f2c7fbc4837f00f4463b7d0b34718
Parents: 7fe0f3f
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Fri May 1 09:50:55 2015 -0500
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Fri May 1 09:50:55 2015 -0500

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |   8 +-
 .../history/ApplicationHistoryProvider.scala    |  13 +-
 .../deploy/history/FsHistoryProvider.scala      | 211 ++++++++++------
 .../spark/deploy/history/HistoryPage.scala      | 111 +++++++--
 .../spark/deploy/history/HistoryServer.scala    |  24 +-
 .../org/apache/spark/deploy/master/Master.scala |  20 +-
 .../scheduler/ApplicationEventListener.scala    |   2 +
 .../spark/scheduler/EventLoggingListener.scala  |  28 ++-
 .../spark/scheduler/SchedulerBackend.scala      |   8 +
 .../apache/spark/scheduler/SparkListener.scala  |   4 +-
 .../apache/spark/scheduler/TaskScheduler.scala  |  10 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     |   2 +
 .../org/apache/spark/util/JsonProtocol.scala    |   6 +-
 .../deploy/history/FsHistoryProviderSuite.scala | 243 ++++++++++++++-----
 .../deploy/history/HistoryServerSuite.scala     |   3 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     |   2 +
 .../scheduler/EventLoggingListenerSuite.scala   |  18 +-
 .../spark/scheduler/ReplayListenerSuite.scala   |   4 +-
 .../apache/spark/util/JsonProtocolSuite.scala   |  11 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |   7 +-
 .../cluster/YarnClusterSchedulerBackend.scala   |  12 +-
 21 files changed, 546 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index fe24260..3f7cba6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -217,6 +217,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   private var _heartbeatReceiver: RpcEndpointRef = _
   @volatile private var _dagScheduler: DAGScheduler = _
   private var _applicationId: String = _
+  private var _applicationAttemptId: Option[String] = None
   private var _eventLogger: Option[EventLoggingListener] = None
   private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
   private var _cleaner: Option[ContextCleaner] = None
@@ -315,6 +316,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   }
 
   def applicationId: String = _applicationId
+  def applicationAttemptId: Option[String] = _applicationAttemptId
 
   def metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null
 
@@ -472,6 +474,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     _taskScheduler.start()
 
     _applicationId = _taskScheduler.applicationId()
+    _applicationAttemptId = taskScheduler.applicationAttemptId()
     _conf.set("spark.app.id", _applicationId)
     _env.blockManager.initialize(_applicationId)
 
@@ -484,7 +487,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     _eventLogger =
       if (isEventLogEnabled) {
         val logger =
-          new EventLoggingListener(_applicationId, _eventLogDir.get, _conf, _hadoopConfiguration)
+          new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
+            _conf, _hadoopConfiguration)
         logger.start()
         listenerBus.addListener(logger)
         Some(logger)
@@ -1868,7 +1872,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     // Note: this code assumes that the task scheduler has been initialized and has contacted
     // the cluster manager to get an application ID (in case the cluster manager provides one).
     listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
-      startTime, sparkUser))
+      startTime, sparkUser, applicationAttemptId))
   }
 
   /** Post the application end event */

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
index ea6c85e..6a5011a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala
@@ -19,15 +19,19 @@ package org.apache.spark.deploy.history
 
 import org.apache.spark.ui.SparkUI
 
-private[history] case class ApplicationHistoryInfo(
-    id: String,
-    name: String,
+private[history] case class ApplicationAttemptInfo(
+    attemptId: Option[String],
     startTime: Long,
     endTime: Long,
     lastUpdated: Long,
     sparkUser: String,
     completed: Boolean = false)
 
+private[history] case class ApplicationHistoryInfo(
+    id: String,
+    name: String,
+    attempts: List[ApplicationAttemptInfo])
+
 private[history] abstract class ApplicationHistoryProvider {
 
   /**
@@ -41,9 +45,10 @@ private[history] abstract class ApplicationHistoryProvider {
    * Returns the Spark UI for a specific application.
    *
    * @param appId The application ID.
+   * @param attemptId The application attempt ID (or None if there is no attempt ID).
    * @return The application's UI, or None if application is not found.
    */
-  def getAppUI(appId: String): Option[SparkUI]
+  def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI]
 
   /**
    * Called when the server is shutting down.

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index fb2cbbc..993763f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -32,7 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.scheduler._
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 import org.apache.spark.{Logging, SecurityManager, SparkConf}
 
 /**
@@ -40,8 +40,12 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
  * This provider checks for new finished applications in the background periodically and
  * renders the history application UI by parsing the associated event logs.
  */
-private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
-  with Logging {
+private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
+  extends ApplicationHistoryProvider with Logging {
+
+  def this(conf: SparkConf) = {
+    this(conf, new SystemClock())
+  }
 
   import FsHistoryProvider._
 
@@ -75,8 +79,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
   @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
     = new mutable.LinkedHashMap()
 
-  // List of applications to be deleted by event log cleaner.
-  private var appsToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
+  // List of application logs to be deleted by event log cleaner.
+  private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
 
   // Constants used to parse Spark 1.0.0 log directories.
   private[history] val LOG_PREFIX = "EVENT_LOG_"
@@ -138,31 +142,33 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
 
   override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values
 
-  override def getAppUI(appId: String): Option[SparkUI] = {
+  override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = {
     try {
-      applications.get(appId).map { info =>
-        val replayBus = new ReplayListenerBus()
-        val ui = {
-          val conf = this.conf.clone()
-          val appSecManager = new SecurityManager(conf)
-          SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
-            s"${HistoryServer.UI_PATH_PREFIX}/$appId")
-          // Do not call ui.bind() to avoid creating a new server for each application
-        }
+      applications.get(appId).flatMap { appInfo =>
+        appInfo.attempts.find(_.attemptId == attemptId).map { attempt =>
+          val replayBus = new ReplayListenerBus()
+          val ui = {
+            val conf = this.conf.clone()
+            val appSecManager = new SecurityManager(conf)
+            SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
+              HistoryServer.getAttemptURI(appId, attempt.attemptId))
+            // Do not call ui.bind() to avoid creating a new server for each application
+          }
 
-        val appListener = new ApplicationEventListener()
-        replayBus.addListener(appListener)
-        val appInfo = replay(fs.getFileStatus(new Path(logDir, info.logPath)), replayBus)
+          val appListener = new ApplicationEventListener()
+          replayBus.addListener(appListener)
+          val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)
 
-        ui.setAppName(s"${appInfo.name} ($appId)")
+          ui.setAppName(s"${appInfo.name} ($appId)")
 
-        val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
-        ui.getSecurityManager.setAcls(uiAclsEnabled)
-        // make sure to set admin acls before view acls so they are properly picked up
-        ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
-        ui.getSecurityManager.setViewAcls(appInfo.sparkUser,
-          appListener.viewAcls.getOrElse(""))
-        ui
+          val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
+          ui.getSecurityManager.setAcls(uiAclsEnabled)
+          // make sure to set admin acls before view acls so they are properly picked up
+          ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
+          ui.getSecurityManager.setViewAcls(attempt.sparkUser,
+            appListener.viewAcls.getOrElse(""))
+          ui
+        }
       }
     } catch {
       case e: FileNotFoundException => None
@@ -220,7 +226,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
    */
   private def mergeApplicationListing(logs: Seq[FileStatus]): Unit = {
     val bus = new ReplayListenerBus()
-    val newApps = logs.flatMap { fileStatus =>
+    val newAttempts = logs.flatMap { fileStatus =>
       try {
         val res = replay(fileStatus, bus)
         logInfo(s"Application log ${res.logPath} loaded successfully.")
@@ -232,76 +238,104 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
             e)
           None
       }
-    }.toSeq.sortWith(compareAppInfo)
-
-    // When there are new logs, merge the new list with the existing one, maintaining
-    // the expected ordering (descending end time). Maintaining the order is important
-    // to avoid having to sort the list every time there is a request for the log list.
-    if (newApps.nonEmpty) {
-      val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
-      def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
-        if (!mergedApps.contains(info.id) ||
-            mergedApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
-            !info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
-          mergedApps += (info.id -> info)
-        }
-      }
+    }
 
-      val newIterator = newApps.iterator.buffered
-      val oldIterator = applications.values.iterator.buffered
-      while (newIterator.hasNext && oldIterator.hasNext) {
-        if (compareAppInfo(newIterator.head, oldIterator.head)) {
-          addIfAbsent(newIterator.next())
-        } else {
-          addIfAbsent(oldIterator.next())
+    if (newAttempts.isEmpty) {
+      return
+    }
+
+    // Build a map containing all apps that contain new attempts. The app information in this map
+    // contains both the new app attempt, and those that were already loaded in the existing apps
+    // map. If an attempt has been updated, it replaces the old attempt in the list.
+    val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()
+    newAttempts.foreach { attempt =>
+      val appInfo = newAppMap.get(attempt.appId)
+        .orElse(applications.get(attempt.appId))
+        .map { app =>
+          val attempts =
+            app.attempts.filter(_.attemptId != attempt.attemptId).toList ++ List(attempt)
+          new FsApplicationHistoryInfo(attempt.appId, attempt.name,
+            attempts.sortWith(compareAttemptInfo))
         }
+        .getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt)))
+      newAppMap(attempt.appId) = appInfo
+    }
+
+    // Merge the new app list with the existing one, maintaining the expected ordering (descending
+    // end time). Maintaining the order is important to avoid having to sort the list every time
+    // there is a request for the log list.
+    val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
+    val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
+    def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
+      if (!mergedApps.contains(info.id)) {
+        mergedApps += (info.id -> info)
       }
-      newIterator.foreach(addIfAbsent)
-      oldIterator.foreach(addIfAbsent)
+    }
 
-      applications = mergedApps
+    val newIterator = newApps.iterator.buffered
+    val oldIterator = applications.values.iterator.buffered
+    while (newIterator.hasNext && oldIterator.hasNext) {
+      if (newAppMap.contains(oldIterator.head.id)) {
+        oldIterator.next()
+      } else if (compareAppInfo(newIterator.head, oldIterator.head)) {
+        addIfAbsent(newIterator.next())
+      } else {
+        addIfAbsent(oldIterator.next())
+      }
     }
+    newIterator.foreach(addIfAbsent)
+    oldIterator.foreach(addIfAbsent)
+
+    applications = mergedApps
   }
 
   /**
    * Delete event logs from the log directory according to the clean policy defined by the user.
    */
-  private def cleanLogs(): Unit = {
+  private[history] def cleanLogs(): Unit = {
     try {
       val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000
 
-      val now = System.currentTimeMillis()
+      val now = clock.getTimeMillis()
       val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
 
+      def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = {
+        now - attempt.lastUpdated > maxAge && attempt.completed
+      }
+
       // Scan all logs from the log directory.
       // Only completed applications older than the specified max age will be deleted.
-      applications.values.foreach { info =>
-        if (now - info.lastUpdated <= maxAge || !info.completed) {
-          appsToRetain += (info.id -> info)
-        } else {
-          appsToClean += info
+      applications.values.foreach { app =>
+        val (toClean, toRetain) = app.attempts.partition(shouldClean)
+        attemptsToClean ++= toClean
+
+        if (toClean.isEmpty) {
+          appsToRetain += (app.id -> app)
+        } else if (toRetain.nonEmpty) {
+          appsToRetain += (app.id ->
+            new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList))
         }
       }
 
       applications = appsToRetain
 
-      val leftToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
-      appsToClean.foreach { info =>
+      val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
+      attemptsToClean.foreach { attempt =>
         try {
-          val path = new Path(logDir, info.logPath)
+          val path = new Path(logDir, attempt.logPath)
           if (fs.exists(path)) {
             fs.delete(path, true)
           }
         } catch {
           case e: AccessControlException =>
-            logInfo(s"No permission to delete ${info.logPath}, ignoring.")
+            logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
           case t: IOException =>
-            logError(s"IOException in cleaning logs of ${info.logPath}", t)
-            leftToClean += info
+            logError(s"IOException in cleaning ${attempt.logPath}", t)
+            leftToClean += attempt
         }
       }
 
-      appsToClean = leftToClean
+      attemptsToClean = leftToClean
     } catch {
       case t: Exception => logError("Exception in cleaning logs", t)
     }
@@ -315,14 +349,36 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
   private def compareAppInfo(
       i1: FsApplicationHistoryInfo,
       i2: FsApplicationHistoryInfo): Boolean = {
-    if (i1.endTime != i2.endTime) i1.endTime >= i2.endTime else i1.startTime >= i2.startTime
+    val a1 = i1.attempts.head
+    val a2 = i2.attempts.head
+    if (a1.endTime != a2.endTime) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
+  }
+
+  /**
+   * Comparison function that defines the sort order for application attempts within the same
+   * application. Order is: running attempts before complete attempts, running attempts sorted
+   * by start time, completed attempts sorted by end time.
+   *
+   * Normally applications should have a single running attempt; but failure to call sc.stop()
+   * may cause multiple running attempts to show up.
+   *
+   * @return Whether `a1` should precede `a2`.
+   */
+  private def compareAttemptInfo(
+      a1: FsApplicationAttemptInfo,
+      a2: FsApplicationAttemptInfo): Boolean = {
+    if (a1.completed == a2.completed) {
+      if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
+    } else {
+      !a1.completed
+    }
   }
 
   /**
    * Replays the events in the specified log file and returns information about the associated
    * application.
    */
-  private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = {
+  private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
     val logPath = eventLog.getPath()
     logInfo(s"Replaying log path: $logPath")
     val logInput =
@@ -336,10 +392,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
       val appCompleted = isApplicationCompleted(eventLog)
       bus.addListener(appListener)
       bus.replay(logInput, logPath.toString, !appCompleted)
-      new FsApplicationHistoryInfo(
+      new FsApplicationAttemptInfo(
         logPath.getName(),
-        appListener.appId.getOrElse(logPath.getName()),
         appListener.appName.getOrElse(NOT_STARTED),
+        appListener.appId.getOrElse(logPath.getName()),
+        appListener.appAttemptId,
         appListener.startTime.getOrElse(-1L),
         appListener.endTime.getOrElse(-1L),
         getModificationTime(eventLog).get,
@@ -425,13 +482,21 @@ private object FsHistoryProvider {
   val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
 }
 
-private class FsApplicationHistoryInfo(
+private class FsApplicationAttemptInfo(
     val logPath: String,
-    id: String,
-    name: String,
+    val name: String,
+    val appId: String,
+    attemptId: Option[String],
     startTime: Long,
     endTime: Long,
     lastUpdated: Long,
     sparkUser: String,
     completed: Boolean = true)
-  extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed)
+  extends ApplicationAttemptInfo(
+      attemptId, startTime, endTime, lastUpdated, sparkUser, completed)
+
+private class FsApplicationHistoryInfo(
+    id: String,
+    override val name: String,
+    override val attempts: List[FsApplicationAttemptInfo])
+  extends ApplicationHistoryInfo(id, name, attempts)

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
index 3781b4e..0830cc1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
@@ -34,18 +34,28 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
     val requestedIncomplete =
       Option(request.getParameter("showIncomplete")).getOrElse("false").toBoolean
 
-    val allApps = parent.getApplicationList().filter(_.completed != requestedIncomplete)
-    val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
-    val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))
+    val allApps = parent.getApplicationList()
+      .filter(_.attempts.head.completed != requestedIncomplete)
+    val allAppsSize = allApps.size
+
+    val actualFirst = if (requestedFirst < allAppsSize) requestedFirst else 0
+    val appsToShow = allApps.slice(actualFirst, actualFirst + pageSize)
 
     val actualPage = (actualFirst / pageSize) + 1
-    val last = Math.min(actualFirst + pageSize, allApps.size) - 1
-    val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0)
+    val last = Math.min(actualFirst + pageSize, allAppsSize) - 1
+    val pageCount = allAppsSize / pageSize + (if (allAppsSize % pageSize > 0) 1 else 0)
 
     val secondPageFromLeft = 2
     val secondPageFromRight = pageCount - 1
 
-    val appTable = UIUtils.listingTable(appHeader, appRow, apps)
+    val hasMultipleAttempts = appsToShow.exists(_.attempts.size > 1)
+    val appTable =
+      if (hasMultipleAttempts) {
+        UIUtils.listingTable(appWithAttemptHeader, appWithAttemptRow, appsToShow)
+      } else {
+        UIUtils.listingTable(appHeader, appRow, appsToShow)
+      }
+
     val providerConfig = parent.getProviderConfig()
     val content =
       <div class="row-fluid">
@@ -59,7 +69,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
             // to the first and last page. If the current page +/- `plusOrMinus` is greater
             // than the 2nd page from the first page or less than the 2nd page from the last
             // page, `...` will be displayed.
-            if (allApps.size > 0) {
+            if (allAppsSize > 0) {
               val leftSideIndices =
                 rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _, requestedIncomplete)
               val rightSideIndices =
@@ -67,7 +77,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
                   requestedIncomplete)
 
               <h4>
-                Showing {actualFirst + 1}-{last + 1} of {allApps.size}
+                Showing {actualFirst + 1}-{last + 1} of {allAppsSize}
                 {if (requestedIncomplete) "(Incomplete applications)"}
                 <span style="float: right">
                   {
@@ -125,30 +135,85 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
     "Spark User",
     "Last Updated")
 
-  private def rangeIndices(range: Seq[Int], condition: Int => Boolean, showIncomplete: Boolean):
-  Seq[Node] = {
+  private val appWithAttemptHeader = Seq(
+    "App ID",
+    "App Name",
+    "Attempt ID",
+    "Started",
+    "Completed",
+    "Duration",
+    "Spark User",
+    "Last Updated")
+
+  private def rangeIndices(
+      range: Seq[Int],
+      condition: Int => Boolean,
+      showIncomplete: Boolean): Seq[Node] = {
     range.filter(condition).map(nextPage =>
       <a href={makePageLink(nextPage, showIncomplete)}> {nextPage} </a>)
   }
 
-  private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
-    val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
-    val startTime = UIUtils.formatDate(info.startTime)
-    val endTime = if (info.endTime > 0) UIUtils.formatDate(info.endTime) else "-"
+  private def attemptRow(
+      renderAttemptIdColumn: Boolean,
+      info: ApplicationHistoryInfo,
+      attempt: ApplicationAttemptInfo,
+      isFirst: Boolean): Seq[Node] = {
+    val uiAddress = HistoryServer.getAttemptURI(info.id, attempt.attemptId)
+    val startTime = UIUtils.formatDate(attempt.startTime)
+    val endTime = if (attempt.endTime > 0) UIUtils.formatDate(attempt.endTime) else "-"
     val duration =
-      if (info.endTime > 0) UIUtils.formatDuration(info.endTime - info.startTime) else "-"
-    val lastUpdated = UIUtils.formatDate(info.lastUpdated)
+      if (attempt.endTime > 0) {
+        UIUtils.formatDuration(attempt.endTime - attempt.startTime)
+      } else {
+        "-"
+      }
+    val lastUpdated = UIUtils.formatDate(attempt.lastUpdated)
     <tr>
-      <td><a href={uiAddress}>{info.id}</a></td>
-      <td>{info.name}</td>
-      <td sorttable_customkey={info.startTime.toString}>{startTime}</td>
-      <td sorttable_customkey={info.endTime.toString}>{endTime}</td>
-      <td sorttable_customkey={(info.endTime - info.startTime).toString}>{duration}</td>
-      <td>{info.sparkUser}</td>
-      <td sorttable_customkey={info.lastUpdated.toString}>{lastUpdated}</td>
+      {
+        if (isFirst) {
+          if (info.attempts.size > 1 || renderAttemptIdColumn) {
+            <td rowspan={info.attempts.size.toString} style="background-color: #ffffff">
+              <a href={uiAddress}>{info.id}</a></td>
+            <td rowspan={info.attempts.size.toString} style="background-color: #ffffff">
+              {info.name}</td>
+          } else {
+            <td><a href={uiAddress}>{info.id}</a></td>
+            <td>{info.name}</td>
+          }
+        } else {
+          Nil
+        }
+      }
+      {
+        if (renderAttemptIdColumn) {
+          if (info.attempts.size > 1 && attempt.attemptId.isDefined) {
+            <td><a href={HistoryServer.getAttemptURI(info.id, attempt.attemptId)}>
+              {attempt.attemptId.get}</a></td>
+          } else {
+            <td>&nbsp;</td>
+          }
+        } else {
+          Nil
+        }
+      }
+      <td sorttable_customkey={attempt.startTime.toString}>{startTime}</td>
+      <td sorttable_customkey={attempt.endTime.toString}>{endTime}</td>
+      <td sorttable_customkey={(attempt.endTime - attempt.startTime).toString}>
+        {duration}</td>
+      <td>{attempt.sparkUser}</td>
+      <td sorttable_customkey={attempt.lastUpdated.toString}>{lastUpdated}</td>
     </tr>
   }
 
+  private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
+    attemptRow(false, info, info.attempts.head, true)
+  }
+
+  private def appWithAttemptRow(info: ApplicationHistoryInfo): Seq[Node] = {
+    attemptRow(true, info, info.attempts.head, true) ++
+      info.attempts.drop(1).flatMap(attemptRow(true, info, _, false))
+  }
+
   private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = {
     "/?" + Array(
       "page=" + linkPage,

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 56bef57..754c8e9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -52,7 +52,11 @@ class HistoryServer(
 
   private val appLoader = new CacheLoader[String, SparkUI] {
     override def load(key: String): SparkUI = {
-      val ui = provider.getAppUI(key).getOrElse(throw new NoSuchElementException())
+      val parts = key.split("/")
+      require(parts.length == 1 || parts.length == 2, s"Invalid app key $key")
+      val ui = provider
+        .getAppUI(parts(0), if (parts.length > 1) Some(parts(1)) else None)
+        .getOrElse(throw new NoSuchElementException())
       attachSparkUI(ui)
       ui
     }
@@ -69,6 +73,8 @@ class HistoryServer(
 
   private val loaderServlet = new HttpServlet {
     protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
+      // Parse the URI created by getAttemptURI(). It contains an app ID and an optional
+      // attempt ID (separated by a slash).
       val parts = Option(req.getPathInfo()).getOrElse("").split("/")
       if (parts.length < 2) {
         res.sendError(HttpServletResponse.SC_BAD_REQUEST,
@@ -76,18 +82,23 @@ class HistoryServer(
         return
       }
 
-      val appId = parts(1)
+      val appKey =
+        if (parts.length == 3) {
+          s"${parts(1)}/${parts(2)}"
+        } else {
+          parts(1)
+        }
 
       // Note we don't use the UI retrieved from the cache; the cache loader above will register
       // the app's UI, and all we need to do is redirect the user to the same URI that was
       // requested, and the proper data should be served at that point.
       try {
-        appCache.get(appId)
+        appCache.get(appKey)
         res.sendRedirect(res.encodeRedirectURL(req.getRequestURI()))
       } catch {
         case e: Exception => e.getCause() match {
           case nsee: NoSuchElementException =>
-            val msg = <div class="row-fluid">Application {appId} not found.</div>
+            val msg = <div class="row-fluid">Application {appKey} not found.</div>
             res.setStatus(HttpServletResponse.SC_NOT_FOUND)
             UIUtils.basicSparkPage(msg, "Not Found").foreach(
               n => res.getWriter().write(n.toString))
@@ -213,4 +224,9 @@ object HistoryServer extends Logging {
     }
   }
 
+  private[history] def getAttemptURI(appId: String, attemptId: Option[String]): String = {
+    val attemptSuffix = attemptId.map { id => s"/$id" }.getOrElse("")
+    s"${HistoryServer.UI_PATH_PREFIX}/${appId}${attemptSuffix}"
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 1c21c17..dc6077f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -62,7 +62,7 @@ private[master] class Master(
   private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
 
   private def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss")  // For application IDs
-  
+
   private val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
   private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
   private val RETAINED_DRIVERS = conf.getInt("spark.deploy.retainedDrivers", 200)
@@ -86,7 +86,7 @@ private[master] class Master(
   private val drivers = new HashSet[DriverInfo]
   private val completedDrivers = new ArrayBuffer[DriverInfo]
   // Drivers currently spooled for scheduling
-  private val waitingDrivers = new ArrayBuffer[DriverInfo] 
+  private val waitingDrivers = new ArrayBuffer[DriverInfo]
   private var nextDriverNumber = 0
 
   Utils.checkHost(host, "Expected hostname")
@@ -758,24 +758,24 @@ private[master] class Master(
           app.desc.appUiUrl = notFoundBasePath
           return false
         }
-      
+
       val eventLogFilePrefix = EventLoggingListener.getLogPath(
-          eventLogDir, app.id, app.desc.eventLogCodec)
+          eventLogDir, app.id, None, app.desc.eventLogCodec)
       val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
-      val inProgressExists = fs.exists(new Path(eventLogFilePrefix + 
+      val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
           EventLoggingListener.IN_PROGRESS))
-      
+
       if (inProgressExists) {
         // Event logging is enabled for this application, but the application is still in progress
         logWarning(s"Application $appName is still in progress, it may be terminated abnormally.")
       }
-      
+
       val (eventLogFile, status) = if (inProgressExists) {
         (eventLogFilePrefix + EventLoggingListener.IN_PROGRESS, " (in progress)")
       } else {
         (eventLogFilePrefix, " (completed)")
       }
-      
+
       val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
       val replayBus = new ReplayListenerBus()
       val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
@@ -859,8 +859,8 @@ private[master] class Master(
   }
 
   private def removeDriver(
-      driverId: String, 
-      finalState: DriverState, 
+      driverId: String,
+      finalState: DriverState,
       exception: Option[Exception]) {
     drivers.find(d => d.id == driverId) match {
       case Some(driver) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
index 6d39a5e..9f218c6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala
@@ -26,6 +26,7 @@ package org.apache.spark.scheduler
 private[spark] class ApplicationEventListener extends SparkListener {
   var appName: Option[String] = None
   var appId: Option[String] = None
+  var appAttemptId: Option[String] = None
   var sparkUser: Option[String] = None
   var startTime: Option[Long] = None
   var endTime: Option[Long] = None
@@ -35,6 +36,7 @@ private[spark] class ApplicationEventListener extends SparkListener {
   override def onApplicationStart(applicationStart: SparkListenerApplicationStart) {
     appName = Some(applicationStart.appName)
     appId = applicationStart.appId
+    appAttemptId = applicationStart.appAttemptId
     startTime = Some(applicationStart.time)
     sparkUser = Some(applicationStart.sparkUser)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index 08e7727..529a5b2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -47,6 +47,7 @@ import org.apache.spark.util.{JsonProtocol, Utils}
  */
 private[spark] class EventLoggingListener(
     appId: String,
+    appAttemptId : Option[String],
     logBaseDir: URI,
     sparkConf: SparkConf,
     hadoopConf: Configuration)
@@ -54,8 +55,9 @@ private[spark] class EventLoggingListener(
 
   import EventLoggingListener._
 
-  def this(appId: String, logBaseDir: URI, sparkConf: SparkConf) =
-    this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
+  def this(appId: String, appAttemptId : Option[String], logBaseDir: URI, sparkConf: SparkConf) =
+    this(appId, appAttemptId, logBaseDir, sparkConf,
+      SparkHadoopUtil.get.newConfiguration(sparkConf))
 
   private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
   private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
@@ -89,7 +91,7 @@ private[spark] class EventLoggingListener(
   private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
 
   // Visible for tests only.
-  private[scheduler] val logPath = getLogPath(logBaseDir, appId, compressionCodecName)
+  private[scheduler] val logPath = getLogPath(logBaseDir, appId, appAttemptId, compressionCodecName)
 
   /**
    * Creates the log file in the configured log directory.
@@ -252,8 +254,12 @@ private[spark] object EventLoggingListener extends Logging {
    * we won't know which codec to use to decompress the metadata needed to open the file in
    * the first place.
    *
+   * The log file name will identify the compression codec used for the contents, if any.
+   * For example, app_123 for an uncompressed log, app_123.lzf for an LZF-compressed log.
+   *
    * @param logBaseDir Directory where the log file will be written.
    * @param appId A unique app ID.
+   * @param appAttemptId A unique attempt id of appId. May be the empty string.
    * @param compressionCodecName Name to identify the codec used to compress the contents
    *                             of the log, or None if compression is not enabled.
    * @return A path which consists of file-system-safe characters.
@@ -261,11 +267,19 @@ private[spark] object EventLoggingListener extends Logging {
   def getLogPath(
       logBaseDir: URI,
       appId: String,
+      appAttemptId: Option[String],
       compressionCodecName: Option[String] = None): String = {
-    val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
-    // e.g. app_123, app_123.lzf
-    val logName = sanitizedAppId + compressionCodecName.map { "." + _ }.getOrElse("")
-    logBaseDir.toString.stripSuffix("/") + "/" + logName
+    val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId)
+    val codec = compressionCodecName.map("." + _).getOrElse("")
+    if (appAttemptId.isDefined) {
+      base + "_" + sanitize(appAttemptId.get) + codec
+    } else {
+      base + codec
+    }
+  }
+
+  private def sanitize(str: String): String = {
+    str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 992c477..6468205 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -41,4 +41,12 @@ private[spark] trait SchedulerBackend {
    */
   def applicationId(): String = appId
 
+  /**
+   * Get the attempt ID for this run, if the cluster manager supports multiple
+   * attempts. Applications run in client mode will not have attempt IDs.
+   *
+   * @return The application attempt id, if available.
+   */
+  def applicationAttemptId(): Option[String] = None
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index b711ff2..169d4fd 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -110,8 +110,8 @@ case class SparkListenerExecutorMetricsUpdate(
   extends SparkListenerEvent
 
 @DeveloperApi
-case class SparkListenerApplicationStart(appName: String, appId: Option[String], time: Long,
-  sparkUser: String) extends SparkListenerEvent
+case class SparkListenerApplicationStart(appName: String, appId: Option[String],
+   time: Long, sparkUser: String, appAttemptId: Option[String]) extends SparkListenerEvent
 
 @DeveloperApi
 case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
index ed34186..f25f3ed 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala
@@ -73,9 +73,17 @@ private[spark] trait TaskScheduler {
    * @return An application ID
    */
   def applicationId(): String = appId
-  
+
   /**
    * Process a lost executor
    */
   def executorLost(executorId: String, reason: ExecutorLossReason): Unit
+
+  /**
+   * Get an application's attempt ID associated with the job.
+   *
+   * @return An application's Attempt ID
+   */
+  def applicationAttemptId(): Option[String]
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 13a52d8..b4b8a63 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -513,6 +513,8 @@ private[spark] class TaskSchedulerImpl(
 
   override def applicationId(): String = backend.applicationId()
 
+  override def applicationAttemptId(): Option[String] = backend.applicationAttemptId()
+
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 44d2749..ee02fbd 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -194,7 +194,8 @@ private[spark] object JsonProtocol {
     ("App Name" -> applicationStart.appName) ~
     ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
     ("Timestamp" -> applicationStart.time) ~
-    ("User" -> applicationStart.sparkUser)
+    ("User" -> applicationStart.sparkUser) ~
+    ("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing))
   }
 
   def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
@@ -562,7 +563,8 @@ private[spark] object JsonProtocol {
     val appId = Utils.jsonOption(json \ "App ID").map(_.extract[String])
     val time = (json \ "Timestamp").extract[Long]
     val sparkUser = (json \ "User").extract[String]
-    SparkListenerApplicationStart(appName, appId, time, sparkUser)
+    val appAttemptId = Utils.jsonOption(json \ "App Attempt ID").map(_.extract[String])
+    SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId)
   }
 
   def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 9e367a0..a0a0afa 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.history
 
 import java.io.{BufferedOutputStream, File, FileOutputStream, OutputStreamWriter}
 import java.net.URI
+import java.util.concurrent.TimeUnit
 
 import scala.io.Source
 
@@ -30,7 +31,7 @@ import org.scalatest.Matchers
 import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.io._
 import org.apache.spark.scheduler._
-import org.apache.spark.util.{JsonProtocol, Utils}
+import org.apache.spark.util.{JsonProtocol, ManualClock, Utils}
 
 class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers with Logging {
 
@@ -47,10 +48,11 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
   /** Create a fake log file using the new log format used in Spark 1.3+ */
   private def newLogFile(
       appId: String,
+      appAttemptId: Option[String],
       inProgress: Boolean,
       codec: Option[String] = None): File = {
     val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
-    val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId)
+    val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, appAttemptId)
     val logPath = new URI(logUri).getPath + ip
     new File(logPath)
   }
@@ -59,22 +61,23 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
     val provider = new FsHistoryProvider(createTestConf())
 
     // Write a new-style application log.
-    val newAppComplete = newLogFile("new1", inProgress = false)
+    val newAppComplete = newLogFile("new1", None, inProgress = false)
     writeFile(newAppComplete, true, None,
-      SparkListenerApplicationStart("new-app-complete", None, 1L, "test"),
+      SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
       SparkListenerApplicationEnd(5L)
       )
 
     // Write a new-style application log.
-    val newAppCompressedComplete = newLogFile("new1compressed", inProgress = false, Some("lzf"))
+    val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
+      Some("lzf"))
     writeFile(newAppCompressedComplete, true, None,
-      SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test"),
+      SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
       SparkListenerApplicationEnd(4L))
 
     // Write an unfinished app, new-style.
-    val newAppIncomplete = newLogFile("new2", inProgress = true)
+    val newAppIncomplete = newLogFile("new2", None, inProgress = true)
     writeFile(newAppIncomplete, true, None,
-      SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test")
+      SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
       )
 
     // Write an old-style application log.
@@ -82,7 +85,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
     oldAppComplete.mkdir()
     createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
     writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
-      SparkListenerApplicationStart("old-app-complete", None, 2L, "test"),
+      SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
       SparkListenerApplicationEnd(3L)
       )
     createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
@@ -96,33 +99,45 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
     oldAppIncomplete.mkdir()
     createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
     writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
-      SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test")
+      SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
       )
 
     // Force a reload of data from the log directory, and check that both logs are loaded.
     // Take the opportunity to check that the offset checks work as expected.
-    provider.checkForLogs()
+    updateAndCheck(provider) { list =>
+      list.size should be (5)
+      list.count(_.attempts.head.completed) should be (3)
+
+      def makeAppInfo(
+          id: String,
+          name: String,
+          start: Long,
+          end: Long,
+          lastMod: Long,
+          user: String,
+          completed: Boolean): ApplicationHistoryInfo = {
+        ApplicationHistoryInfo(id, name,
+          List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
+      }
 
-    val list = provider.getListing().toSeq
-    list should not be (null)
-    list.size should be (5)
-    list.count(_.completed) should be (3)
-
-    list(0) should be (ApplicationHistoryInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
-      newAppComplete.lastModified(), "test", true))
-    list(1) should be (ApplicationHistoryInfo(newAppCompressedComplete.getName(),
-      "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
-    list(2) should be (ApplicationHistoryInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
-      oldAppComplete.lastModified(), "test", true))
-    list(3) should be (ApplicationHistoryInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L,
-      -1L, oldAppIncomplete.lastModified(), "test", false))
-    list(4) should be (ApplicationHistoryInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L,
-      -1L, newAppIncomplete.lastModified(), "test", false))
-
-    // Make sure the UI can be rendered.
-    list.foreach { case info =>
-      val appUi = provider.getAppUI(info.id)
-      appUi should not be null
+      list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
+        newAppComplete.lastModified(), "test", true))
+      list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
+        "new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
+        true))
+      list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
+        oldAppComplete.lastModified(), "test", true))
+      list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
+        oldAppIncomplete.lastModified(), "test", false))
+      list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
+        newAppIncomplete.lastModified(), "test", false))
+
+      // Make sure the UI can be rendered.
+      list.foreach { case info =>
+        val appUi = provider.getAppUI(info.id, None)
+        appUi should not be null
+        appUi should not be None
+      }
     }
   }
 
@@ -138,7 +153,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
       logDir.mkdir()
       createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
       writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
-        SparkListenerApplicationStart("app2", None, 2L, "test"),
+        SparkListenerApplicationStart("app2", None, 2L, "test", None),
         SparkListenerApplicationEnd(3L)
         )
       createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
@@ -159,52 +174,52 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
   }
 
   test("SPARK-3697: ignore directories that cannot be read.") {
-    val logFile1 = newLogFile("new1", inProgress = false)
+    val logFile1 = newLogFile("new1", None, inProgress = false)
     writeFile(logFile1, true, None,
-      SparkListenerApplicationStart("app1-1", None, 1L, "test"),
+      SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
       SparkListenerApplicationEnd(2L)
       )
-    val logFile2 = newLogFile("new2", inProgress = false)
+    val logFile2 = newLogFile("new2", None, inProgress = false)
     writeFile(logFile2, true, None,
-      SparkListenerApplicationStart("app1-2", None, 1L, "test"),
+      SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
       SparkListenerApplicationEnd(2L)
       )
     logFile2.setReadable(false, false)
 
     val provider = new FsHistoryProvider(createTestConf())
-    provider.checkForLogs()
-
-    val list = provider.getListing().toSeq
-    list should not be (null)
-    list.size should be (1)
+    updateAndCheck(provider) { list =>
+      list.size should be (1)
+    }
   }
 
   test("history file is renamed from inprogress to completed") {
     val provider = new FsHistoryProvider(createTestConf())
 
-    val logFile1 = newLogFile("app1", inProgress = true)
+    val logFile1 = newLogFile("app1", None, inProgress = true)
     writeFile(logFile1, true, None,
-      SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
+      SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
       SparkListenerApplicationEnd(2L)
     )
-    provider.checkForLogs()
-    val appListBeforeRename = provider.getListing()
-    appListBeforeRename.size should be (1)
-    appListBeforeRename.head.logPath should endWith(EventLoggingListener.IN_PROGRESS)
+    updateAndCheck(provider) { list =>
+      list.size should be (1)
+      list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should
+        endWith(EventLoggingListener.IN_PROGRESS)
+    }
 
-    logFile1.renameTo(newLogFile("app1", inProgress = false))
-    provider.checkForLogs()
-    val appListAfterRename = provider.getListing()
-    appListAfterRename.size should be (1)
-    appListAfterRename.head.logPath should not endWith(EventLoggingListener.IN_PROGRESS)
+    logFile1.renameTo(newLogFile("app1", None, inProgress = false))
+    updateAndCheck(provider) { list =>
+      list.size should be (1)
+      list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should not
+        endWith(EventLoggingListener.IN_PROGRESS)
+    }
   }
 
   test("SPARK-5582: empty log directory") {
     val provider = new FsHistoryProvider(createTestConf())
 
-    val logFile1 = newLogFile("app1", inProgress = true)
+    val logFile1 = newLogFile("app1", None, inProgress = true)
     writeFile(logFile1, true, None,
-      SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
+      SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", None),
       SparkListenerApplicationEnd(2L))
 
     val oldLog = new File(testDir, "old1")
@@ -215,6 +230,126 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
     appListAfterRename.size should be (1)
   }
 
+  test("apps with multiple attempts") {
+    val provider = new FsHistoryProvider(createTestConf())
+
+    val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = false)
+    writeFile(attempt1, true, None,
+      SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
+      SparkListenerApplicationEnd(2L)
+      )
+
+    updateAndCheck(provider) { list =>
+      list.size should be (1)
+      list.head.attempts.size should be (1)
+    }
+
+    val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true)
+    writeFile(attempt2, true, None,
+      SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2"))
+      )
+
+    updateAndCheck(provider) { list =>
+      list.size should be (1)
+      list.head.attempts.size should be (2)
+      list.head.attempts.head.attemptId should be (Some("attempt2"))
+    }
+
+    val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false)
+    attempt2.delete()
+    writeFile(attempt2, true, None,
+      SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
+      SparkListenerApplicationEnd(4L)
+      )
+
+    updateAndCheck(provider) { list =>
+      list should not be (null)
+      list.size should be (1)
+      list.head.attempts.size should be (2)
+      list.head.attempts.head.attemptId should be (Some("attempt2"))
+    }
+
+    val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
+    writeFile(attempt2, true, None,
+      SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", Some("attempt1")),
+      SparkListenerApplicationEnd(6L)
+      )
+
+    updateAndCheck(provider) { list =>
+      list.size should be (2)
+      list.head.attempts.size should be (1)
+      list.last.attempts.size should be (2)
+      list.head.attempts.head.attemptId should be (Some("attempt1"))
+
+      list.foreach { case app =>
+        app.attempts.foreach { attempt =>
+          val appUi = provider.getAppUI(app.id, attempt.attemptId)
+          appUi should not be null
+        }
+      }
+
+    }
+  }
+
+  test("log cleaner") {
+    val maxAge = TimeUnit.SECONDS.toMillis(10)
+    val clock = new ManualClock(maxAge / 2)
+    val provider = new FsHistoryProvider(
+      createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
+
+    val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
+    writeFile(log1, true, None,
+      SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
+      SparkListenerApplicationEnd(2L)
+      )
+    log1.setLastModified(0L)
+
+    val log2 = newLogFile("app1", Some("attempt2"), inProgress = false)
+    writeFile(log2, true, None,
+      SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
+      SparkListenerApplicationEnd(4L)
+      )
+    log2.setLastModified(clock.getTimeMillis())
+
+    updateAndCheck(provider) { list =>
+      list.size should be (1)
+      list.head.attempts.size should be (2)
+    }
+
+    // Move the clock forward so log1 exceeds the max age.
+    clock.advance(maxAge)
+
+    updateAndCheck(provider) { list =>
+      list.size should be (1)
+      list.head.attempts.size should be (1)
+      list.head.attempts.head.attemptId should be (Some("attempt2"))
+    }
+    assert(!log1.exists())
+
+    // Do the same for the other log.
+    clock.advance(maxAge)
+
+    updateAndCheck(provider) { list =>
+      list.size should be (0)
+    }
+    assert(!log2.exists())
+  }
+
+  /**
+   * Asks the provider to check for logs and calls a function to perform checks on the updated
+   * app list. Example:
+   *
+   *     updateAndCheck(provider) { list =>
+   *       // asserts
+   *     }
+   */
+  private def updateAndCheck(provider: FsHistoryProvider)
+      (checkFn: Seq[ApplicationHistoryInfo] => Unit): Unit = {
+    provider.checkForLogs()
+    provider.cleanLogs()
+    checkFn(provider.getListing().toSeq)
+  }
+
   private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
     events: SparkListenerEvent*) = {
     val fstream = new FileOutputStream(file)

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
index 20de46f..71ba9c1 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala
@@ -36,7 +36,8 @@ class HistoryServerSuite extends FunSuite with Matchers with MockitoSugar {
     val request = mock[HttpServletRequest]
     val ui = mock[SparkUI]
     val link = "/history/app1"
-    val info = new ApplicationHistoryInfo("app1", "app1", 0, 2, 1, "xxx", true)
+    val info = new ApplicationHistoryInfo("app1", "app1",
+      List(ApplicationAttemptInfo(None, 0, 2, 1, "xxx", true)))
     when(historyServer.getApplicationList()).thenReturn(Seq(info))
     when(ui.basePath).thenReturn(link)
     when(historyServer.getProviderConfig()).thenReturn(Map[String, String]())

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 3c52a8c..2482603 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -95,6 +95,7 @@ class DAGSchedulerSuite
     override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
     override def defaultParallelism() = 2
     override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
+    override def applicationAttemptId(): Option[String] = None
   }
 
   /** Length of time to wait while draining listener events. */
@@ -404,6 +405,7 @@ class DAGSchedulerSuite
           taskMetrics: Array[(Long, TaskMetrics)],
           blockManagerId: BlockManagerId): Boolean = true
       override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = {}
+      override def applicationAttemptId(): Option[String] = None
     }
     val noKillScheduler = new DAGScheduler(
       sc,

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 6d25edb..b52a8d1 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -61,7 +61,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
   test("Verify log file exist") {
     // Verify logging directory exists
     val conf = getLoggingConf(testDirPath)
-    val eventLogger = new EventLoggingListener("test", testDirPath.toUri(), conf)
+    val eventLogger = new EventLoggingListener("test", None, testDirPath.toUri(), conf)
     eventLogger.start()
 
     val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
@@ -95,7 +95,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
   }
 
   test("Log overwriting") {
-    val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test")
+    val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test", None)
     val logPath = new URI(logUri).getPath
     // Create file before writing the event log
     new FileOutputStream(new File(logPath)).close()
@@ -108,18 +108,18 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
   test("Event log name") {
     // without compression
     assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath(
-      Utils.resolveURI("/base-dir"), "app1"))
+      Utils.resolveURI("/base-dir"), "app1", None))
     // with compression
     assert(s"file:/base-dir/app1.lzf" ===
-      EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", Some("lzf")))
+      EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", None, Some("lzf")))
     // illegal characters in app ID
     assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
       EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
-        "a fine:mind$dollar{bills}.1"))
+        "a fine:mind$dollar{bills}.1", None))
     // illegal characters in app ID with compression
     assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
       EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
-        "a fine:mind$dollar{bills}.1", Some("lz4")))
+        "a fine:mind$dollar{bills}.1", None, Some("lz4")))
   }
 
   /* ----------------- *
@@ -140,10 +140,10 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
     val conf = getLoggingConf(testDirPath, compressionCodec)
     extraConf.foreach { case (k, v) => conf.set(k, v) }
     val logName = compressionCodec.map("test-" + _).getOrElse("test")
-    val eventLogger = new EventLoggingListener(logName, testDirPath.toUri(), conf)
+    val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf)
     val listenerBus = new LiveListenerBus
     val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
-      125L, "Mickey")
+      125L, "Mickey", None)
     val applicationEnd = SparkListenerApplicationEnd(1000L)
 
     // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
@@ -186,7 +186,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
     val eventLogPath = eventLogger.logPath
     val expectedLogDir = testDir.toURI()
     assert(eventLogPath === EventLoggingListener.getLogPath(
-      expectedLogDir, sc.applicationId, compressionCodec.map(CompressionCodec.getShortName)))
+      expectedLogDir, sc.applicationId, None, compressionCodec.map(CompressionCodec.getShortName)))
 
     // Begin listening for events that trigger asserts
     val eventExistenceListener = new EventExistenceListener(eventLogger)

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index 6de6d2f..dabe457 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -50,7 +50,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
     val fstream = fileSystem.create(logFilePath)
     val writer = new PrintWriter(fstream)
     val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
-      125L, "Mickey")
+      125L, "Mickey", None)
     val applicationEnd = SparkListenerApplicationEnd(1000L)
     writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
     writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
@@ -146,7 +146,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
    * log the events.
    */
   private class EventMonster(conf: SparkConf)
-    extends EventLoggingListener("test", new URI("testdir"), conf) {
+    extends EventLoggingListener("test", None, new URI("testdir"), conf) {
 
     override def start() { }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 2d039cb..0c9cf5b 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -74,7 +74,8 @@ class JsonProtocolSuite extends FunSuite {
     val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
       BlockManagerId("Scarce", "to be counted...", 100))
     val unpersistRdd = SparkListenerUnpersistRDD(12345)
-    val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
+    val applicationStart = SparkListenerApplicationStart("The winner of all", Some("appId"),
+      42L, "Garfield", Some("appAttempt"))
     val applicationEnd = SparkListenerApplicationEnd(42L)
     val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
     val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
@@ -274,9 +275,11 @@ class JsonProtocolSuite extends FunSuite {
 
   test("SparkListenerApplicationStart backwards compatibility") {
     // SparkListenerApplicationStart in Spark 1.0.0 do not have an "appId" property.
-    val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user")
+    // SparkListenerApplicationStart pre-Spark 1.4 does not have "appAttemptId".
+    val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user", None)
     val oldEvent = JsonProtocol.applicationStartToJson(applicationStart)
       .removeField({ _._1 == "App ID" })
+      .removeField({ _._1 == "App Attempt ID" })
     assert(applicationStart === JsonProtocol.applicationStartFromJson(oldEvent))
   }
 
@@ -1497,8 +1500,10 @@ class JsonProtocolSuite extends FunSuite {
       |{
       |  "Event": "SparkListenerApplicationStart",
       |  "App Name": "The winner of all",
+      |  "App ID": "appId",
       |  "Timestamp": 42,
-      |  "User": "Garfield"
+      |  "User": "Garfield",
+      |  "App Attempt ID": "appAttempt"
       |}
     """
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 70cb57f..27f8047 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -89,6 +89,10 @@ private[spark] class ApplicationMaster(
 
         // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
         System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
+
+        // Propagate the attempt if, so that in case of event logging,
+        // different attempt's logs gets created in different directory
+        System.setProperty("spark.yarn.app.attemptId", appAttemptId.getAttemptId().toString())
       }
 
       logInfo("ApplicationAttemptId: " + appAttemptId)
@@ -208,10 +212,11 @@ private[spark] class ApplicationMaster(
     val sc = sparkContextRef.get()
 
     val appId = client.getAttemptId().getApplicationId().toString()
+    val attemptId = client.getAttemptId().getAttemptId().toString()
     val historyAddress =
       sparkConf.getOption("spark.yarn.historyServer.address")
         .map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) }
-        .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}" }
+        .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" }
         .getOrElse("")
 
     allocator = client.register(yarnConf,

http://git-wip-us.apache.org/repos/asf/spark/blob/3052f491/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index b1de81e..aeb218a 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -39,12 +39,18 @@ private[spark] class YarnClusterSchedulerBackend(
   }
 
   override def applicationId(): String =
-    // In YARN Cluster mode, spark.yarn.app.id is expect to be set
-    // before user application is launched.
-    // So, if spark.yarn.app.id is not set, it is something wrong.
+    // In YARN Cluster mode, the application ID is expected to be set, so log an error if it's
+    // not found.
     sc.getConf.getOption("spark.yarn.app.id").getOrElse {
       logError("Application ID is not set.")
       super.applicationId
     }
 
+  override def applicationAttemptId(): Option[String] =
+    // In YARN Cluster mode, the attempt ID is expected to be set, so log an error if it's
+    // not found.
+    sc.getConf.getOption("spark.yarn.app.attemptId").orElse {
+      logError("Application attempt ID is not set.")
+      super.applicationAttemptId
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org