You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "mridulm (via GitHub)" <gi...@apache.org> on 2023/12/06 07:00:45 UTC

Re: [PR] [SPARK-23607][CORE] Use HDFS extended attributes to store application summary information in SHS [spark]

mridulm commented on code in PR #43939:
URL: https://github.com/apache/spark/pull/43939#discussion_r1416699520


##########
core/src/main/scala/org/apache/spark/deploy/history/EventLogFileWriters.scala:
##########
@@ -230,6 +233,17 @@ class SingleEventLogFileWriter(
     writeLine(eventJson, flushLogger)
   }
 
+  override def writeToXAttr(attrName: String, attrValue: String): Unit = {
+    try {
+      fileSystem.setXAttr(new Path(inProgressPath), attrName, attrValue.getBytes())
+    } catch {
+      case _: IOException =>
+        logInfo(s"Failed to set extended attribute ${attrName}")
+      case _: UnsupportedOperationException =>
+        logInfo("setXAttr not supported by filesystem")

Review Comment:
   Instead of repeatedly calling this on filesystem's which dont support `setXAttr` - if it fails with `UnsupportedOperationException`, avoid further calls by keeping track of it ?



##########
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala:
##########
@@ -1177,6 +1191,181 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     listing.read(classOf[ApplicationInfoWrapper], appId)
   }
 
+  private def updateAppInfoFromXAttrs(
+    info: LogInfo, reader: EventLogFileReader, scanTime: Long): Unit = {
+    var xAttrStatus: LogXAttrStatus.Value = info.logXAttrStatus
+    var appStartInfo: Option[Map[String, String]] = None
+    var envUpdateInfo: Option[String] = None
+    var appEndInfo: Option[String] = None
+    var appId = info.appId
+    var attemptId = info.attemptId
+
+    pendingReplayTasksCount.incrementAndGet()
+    /**
+     * XATTR_ENABLED means it is a newly detected file
+     * If we are able to successfully get the application start information,
+     * change the status to APP_STARTED
+    */
+    if (xAttrStatus == LogXAttrStatus.XATTR_ENABLED) {
+      appStartInfo = getXAttrs(reader.rootPath, EventLoggingListener.XATTRS_APPLICATION_START_LIST)
+      if(appStartInfo.isDefined) {
+        appId = Some(appStartInfo.get(EventLoggingListener.USER_APP_ID))
+        attemptId = Some(appStartInfo.get(EventLoggingListener.USER_ATTEMPT_ID))

Review Comment:
   This is a bit confusing formulation - it is not a method `get` with the key as parameter being invoked - but rather, an `apply` on the map returned.
   
   Which leads to a bug - we cannot assume the key will exist in the map, since a write of xattr could have failed.
   
   Something like this would be clearer, and avoid the issue:
   ```suggestion
           appId = appStartInfo.get.get(EventLoggingListener.USER_APP_ID)
           attemptId = appStartInfo.get.get(EventLoggingListener.USER_ATTEMPT_ID)
   ```



##########
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala:
##########
@@ -1177,6 +1191,181 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     listing.read(classOf[ApplicationInfoWrapper], appId)
   }
 
+  private def updateAppInfoFromXAttrs(
+    info: LogInfo, reader: EventLogFileReader, scanTime: Long): Unit = {

Review Comment:
   In this method, we are progressively invoking `getXAttrs` based on previous successfully retrieved xattrs - why not make a call upfront for the entire set of relevant keys instead ? It should result in reduced number of calls to `FileSystem`, and so cheaper as well, right ?
   
   We can update `xAttrStatus` progressively based on which keys exist in the result xattr map.



##########
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala:
##########
@@ -1177,6 +1191,181 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     listing.read(classOf[ApplicationInfoWrapper], appId)
   }
 
+  private def updateAppInfoFromXAttrs(
+    info: LogInfo, reader: EventLogFileReader, scanTime: Long): Unit = {
+    var xAttrStatus: LogXAttrStatus.Value = info.logXAttrStatus
+    var appStartInfo: Option[Map[String, String]] = None
+    var envUpdateInfo: Option[String] = None
+    var appEndInfo: Option[String] = None
+    var appId = info.appId
+    var attemptId = info.attemptId
+
+    pendingReplayTasksCount.incrementAndGet()

Review Comment:
   Do the decrement in `try`/`finally`



##########
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala:
##########
@@ -1580,53 +1854,53 @@ private[history] class AppListingListener(
     }
   }
 
-  private class MutableApplicationInfo {
-    var id: String = null
-    var name: String = null
+}
 
-    def toView(): ApplicationInfoWrapper = {
-      val apiInfo = ApplicationInfo(id, name, None, None, None, None, Nil)
-      new ApplicationInfoWrapper(apiInfo, List(attempt.toView()))
-    }
+private class MutableApplicationInfo {
+  var id: Option[String] = None
+  var name: String = null
 
+  def toView(attempts: List[AttemptInfoWrapper]): ApplicationInfoWrapper = {
+    val apiInfo = ApplicationInfo(id.get, name, None, None, None, None, Nil)
+    new ApplicationInfoWrapper(apiInfo, attempts)
   }
 
-  private class MutableAttemptInfo(logPath: String, fileSize: Long, lastIndex: Option[Long]) {
-    var attemptId: Option[String] = None
-    var startTime = new Date(-1)
-    var endTime = new Date(-1)
-    var lastUpdated = new Date(-1)
-    var duration = 0L
-    var sparkUser: String = null
-    var completed = false
-    var appSparkVersion = ""
-
-    var adminAcls: Option[String] = None
-    var viewAcls: Option[String] = None
-    var adminAclsGroups: Option[String] = None
-    var viewAclsGroups: Option[String] = None
-
-    def toView(): AttemptInfoWrapper = {
-      val apiInfo = ApplicationAttemptInfo(
-        attemptId,
-        startTime,
-        endTime,
-        lastUpdated,
-        duration,
-        sparkUser,
-        completed,
-        appSparkVersion)
-      new AttemptInfoWrapper(
-        apiInfo,
-        logPath,
-        fileSize,
-        lastIndex,
-        adminAcls,
-        viewAcls,
-        adminAclsGroups,
-        viewAclsGroups)
-    }
+}
 
+private class MutableAttemptInfo(logPath: String, fileSize: Long, lastIndex: Option[Long] = None) {
+  var attemptId: Option[String] = None
+  var startTime = new Date(-1)
+  var endTime = new Date(-1)
+  var lastUpdated = new Date(-1)
+  var duration = 0L
+  var sparkUser: String = null
+  var completed = false
+  var appSparkVersion = ""
+
+  var adminAcls: Option[String] = None
+  var viewAcls: Option[String] = None
+  var adminAclsGroups: Option[String] = None
+  var viewAclsGroups: Option[String] = None
+
+  def toView(): AttemptInfoWrapper = {
+    val apiInfo = ApplicationAttemptInfo(
+      attemptId,
+      startTime,
+      endTime,
+      lastUpdated,
+      duration,
+      sparkUser,
+      completed,
+      appSparkVersion)
+    new AttemptInfoWrapper(
+      apiInfo,
+      logPath,
+      fileSize,
+      lastIndex,
+      adminAcls,
+      viewAcls,
+      adminAclsGroups,
+      viewAclsGroups)

Review Comment:
   Is this an indentation change ? If yes, revert.



##########
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala:
##########
@@ -74,12 +78,15 @@ private[spark] class EventLoggingListener(
   private val liveStageExecutorMetrics =
     mutable.HashMap.empty[(Int, Int), mutable.HashMap[String, ExecutorMetrics]]
 
+  private var envUpdateXAttrFlushed = false

Review Comment:
   `envUpdateXAttrFlushed` -> `envXAttrUpdated`



##########
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala:
##########
@@ -1177,6 +1191,181 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     listing.read(classOf[ApplicationInfoWrapper], appId)
   }
 
+  private def updateAppInfoFromXAttrs(
+    info: LogInfo, reader: EventLogFileReader, scanTime: Long): Unit = {
+    var xAttrStatus: LogXAttrStatus.Value = info.logXAttrStatus
+    var appStartInfo: Option[Map[String, String]] = None
+    var envUpdateInfo: Option[String] = None
+    var appEndInfo: Option[String] = None
+    var appId = info.appId
+    var attemptId = info.attemptId
+
+    pendingReplayTasksCount.incrementAndGet()
+    /**
+     * XATTR_ENABLED means it is a newly detected file
+     * If we are able to successfully get the application start information,
+     * change the status to APP_STARTED
+    */
+    if (xAttrStatus == LogXAttrStatus.XATTR_ENABLED) {
+      appStartInfo = getXAttrs(reader.rootPath, EventLoggingListener.XATTRS_APPLICATION_START_LIST)
+      if(appStartInfo.isDefined) {
+        appId = Some(appStartInfo.get(EventLoggingListener.USER_APP_ID))
+        attemptId = Some(appStartInfo.get(EventLoggingListener.USER_ATTEMPT_ID))
+        xAttrStatus = LogXAttrStatus.APP_STARTED
+      }
+    }
+
+    /**
+     * Try to get application env update information
+     * If we are able to successfully get them, change the status to APP_ENV_UPDATED
+     */
+    if(xAttrStatus == LogXAttrStatus.APP_STARTED) {
+      envUpdateInfo = getXAttr(reader.rootPath, EventLoggingListener.USER_ATTEMPT_ACLS)
+      if(envUpdateInfo.isDefined) {
+        xAttrStatus = LogXAttrStatus.APP_ENV_UPDATED
+      }
+    }
+
+    /**
+     * Try to get the application attempt end time
+     * If we are able to successfully get them, change the status to APP_END
+     */
+    if (xAttrStatus == LogXAttrStatus.APP_ENV_UPDATED) {
+      appEndInfo = getXAttr(reader.rootPath, EventLoggingListener.USER_ATTEMPT_ENDTIME)
+      if (appEndInfo.isDefined) {
+        xAttrStatus = LogXAttrStatus.APP_END
+      }
+    }
+
+    // Check if status is updated
+    if(xAttrStatus != info.logXAttrStatus) {
+      val appListingFromXAttr = new AppListingEntryFromXAttr(reader, clock)
+      if (info.logXAttrStatus != LogXAttrStatus.XATTR_ENABLED && appId.isDefined) {
+        val oldApp: Option[ApplicationInfoWrapper] = try {
+          Some(listing.read(classOf[ApplicationInfoWrapper], appId.get))
+        } catch {
+          case _: NoSuchElementException => None
+        }
+        if (oldApp.isDefined) {
+          val oldAppAttemptList = oldApp.get.attempts.filter(_.info.attemptId == attemptId)
+          if (oldAppAttemptList.size == 1) {
+            appListingFromXAttr.applicationInfoFromDB(oldApp.get.info.id,
+              oldApp.get.info.name, oldAppAttemptList.last)
+          }
+        }
+      }
+      if (appStartInfo.isDefined) {
+        appListingFromXAttr.applicationStartFromXAttr(appStartInfo.get)
+      }
+      if (envUpdateInfo.isDefined) {
+        appListingFromXAttr.applicationEnvUpdateFromXAttr(envUpdateInfo.get)
+      }
+      if(appEndInfo.isDefined) {
+        appListingFromXAttr.applicationEndFromXAttr(appEndInfo.get)
+      }
+      appListingFromXAttr.applicationInfo match {
+        case Some(app) =>
+          invalidateUI(app.info.id, app.attempts.head.info.attemptId)
+          addListing(app)
+          listing.write(LogInfo(reader.rootPath.toString, scanTime, LogType.EventLogs,
+            Some(app.info.id), app.attempts.head.info.attemptId, reader.fileSizeForLastIndex,
+            reader.lastIndex, None, reader.completed, xAttrStatus))
+          if (isCompleted(reader.rootPath.toString)) {
+            removeInProgressEntry(reader.rootPath)
+          }
+        case _ =>
+          listing.write(LogInfo(reader.rootPath.toString, scanTime, LogType.EventLogs,
+            None, None, reader.fileSizeForLastIndex, reader.lastIndex, None,
+          reader.completed, xAttrStatus))
+      }
+    }
+    logInfo(s"Finished reading extended attributes for file : ${reader.rootPath}")
+    endProcessing(reader.rootPath)
+    pendingReplayTasksCount.decrementAndGet()
+  }
+
+  /**
+   * For a finished log, remove the corresponding "in progress" entry from the listing DB if
+   * the file is really gone.
+   * @param logPath in progress log file path to be removed
+   */
+  private def removeInProgressEntry(logPath: Path): Unit = {
+    if (isCompleted(logPath.getName())) {
+      val inProgressLog = logPath.toString() + EventLogFileWriter.IN_PROGRESS
+      try {
+        // Fetch the entry first to avoid an RPC when it's already removed.
+        listing.read(classOf[LogInfo], inProgressLog)
+        val fileStatus = fs.getFileStatus(new Path(inProgressLog))
+        if (!fileStatus.isFile) {
+          listing.delete(classOf[LogInfo], inProgressLog)
+        }
+      } catch {
+        case _: NoSuchElementException =>
+      }
+    }
+  }
+
+  /**
+   * @param name of the log file
+   * @return true if log file ends with inprogress
+   */
+  private def isCompleted(name: String): Boolean = {
+    !name.endsWith(EventLogFileWriter.IN_PROGRESS)
+  }
+
+  private def getXAttrs(path: Path, nameList: List[String]): Option[Map[String, String]] = {
+    try {
+      val valueMap = fs.getXAttrs(path, nameList.asJava)
+      if(valueMap != null) {
+        Some(valueMap.asScala.toMap.map(value => value._1 ->
+          new String(value._2, StandardCharsets.UTF_8)))
+      } else {
+        None
+      }
+    } catch {
+      case _: IOException =>
+        logWarning(s"Unable to get extended attributes ${nameList}")
+        None
+    }
+  }
+
+  /**
+   * Get the value of a single attribute from HDFS
+   * @param path path to HDFS
+   * @param name Key of extended attribute
+   * @return Value of extended attribute
+   */
+  private def getXAttr(path: Path, name: String): Option[String] = {
+      val valueMap = getXAttrs(path, List(name))
+      if (!valueMap.isEmpty) {
+        Some(valueMap.get(name))
+      } else {
+        None
+      }
+  }
+
+  /**
+   * For a new file, update listing entry with whether extended attributes
+   * are enabled or disabled on this log file
+   *
+   * @param reader event log file reader
+   * @param scanTime current scan timestamp
+   * @return
+   */
+  private def handleNewLogFile(reader: EventLogFileReader, scanTime: Long): Boolean = {
+    var xAttrStatus = LogXAttrStatus.XATTR_DISABLED
+    if(HISTORY_LOG_USE_XATTR) {
+      val xAttrEnabled = getXAttr(reader.rootPath, EventLoggingListener.USER_XATTR_ENABLED)
+      if (xAttrEnabled.exists(data =>
+        new String(data.getBytes(StandardCharsets.UTF_8), StandardCharsets.UTF_8) == "true")) {

Review Comment:
   ```suggestion
         if (xAttrEnabled.exists(_ == "true")) {
   ```
   
   
   Or better still, `if (xAttrEnabled == Some("true"))`



##########
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala:
##########
@@ -920,6 +917,23 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     }
   }
 
+  private def updateApplicationInfoFromLog(reader: EventLogFileReader, scanTime: Long): Unit = {
+    if (!HISTORY_LOG_USE_XATTR) {
+      mergeApplicationListing(reader, scanTime, true)
+    } else {
+      try {
+        val log = listing.read(classOf[LogInfo], reader.rootPath.toString)
+        if (log.logXAttrStatus != LogXAttrStatus.XATTR_DISABLED) {

Review Comment:
   For a migration case, for existsing entries in 'older shs', will `logXAttrStatus` be `null` or `XATTR_DISABLED` ? Have we validated it ?



##########
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala:
##########
@@ -1177,6 +1191,181 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     listing.read(classOf[ApplicationInfoWrapper], appId)
   }
 
+  private def updateAppInfoFromXAttrs(
+    info: LogInfo, reader: EventLogFileReader, scanTime: Long): Unit = {
+    var xAttrStatus: LogXAttrStatus.Value = info.logXAttrStatus
+    var appStartInfo: Option[Map[String, String]] = None
+    var envUpdateInfo: Option[String] = None
+    var appEndInfo: Option[String] = None
+    var appId = info.appId
+    var attemptId = info.attemptId
+
+    pendingReplayTasksCount.incrementAndGet()
+    /**
+     * XATTR_ENABLED means it is a newly detected file
+     * If we are able to successfully get the application start information,
+     * change the status to APP_STARTED
+    */
+    if (xAttrStatus == LogXAttrStatus.XATTR_ENABLED) {
+      appStartInfo = getXAttrs(reader.rootPath, EventLoggingListener.XATTRS_APPLICATION_START_LIST)
+      if(appStartInfo.isDefined) {
+        appId = Some(appStartInfo.get(EventLoggingListener.USER_APP_ID))
+        attemptId = Some(appStartInfo.get(EventLoggingListener.USER_ATTEMPT_ID))
+        xAttrStatus = LogXAttrStatus.APP_STARTED
+      }
+    }
+
+    /**
+     * Try to get application env update information
+     * If we are able to successfully get them, change the status to APP_ENV_UPDATED
+     */
+    if(xAttrStatus == LogXAttrStatus.APP_STARTED) {
+      envUpdateInfo = getXAttr(reader.rootPath, EventLoggingListener.USER_ATTEMPT_ACLS)
+      if(envUpdateInfo.isDefined) {
+        xAttrStatus = LogXAttrStatus.APP_ENV_UPDATED
+      }
+    }
+
+    /**
+     * Try to get the application attempt end time
+     * If we are able to successfully get them, change the status to APP_END
+     */
+    if (xAttrStatus == LogXAttrStatus.APP_ENV_UPDATED) {
+      appEndInfo = getXAttr(reader.rootPath, EventLoggingListener.USER_ATTEMPT_ENDTIME)
+      if (appEndInfo.isDefined) {
+        xAttrStatus = LogXAttrStatus.APP_END
+      }
+    }
+
+    // Check if status is updated
+    if(xAttrStatus != info.logXAttrStatus) {
+      val appListingFromXAttr = new AppListingEntryFromXAttr(reader, clock)
+      if (info.logXAttrStatus != LogXAttrStatus.XATTR_ENABLED && appId.isDefined) {
+        val oldApp: Option[ApplicationInfoWrapper] = try {
+          Some(listing.read(classOf[ApplicationInfoWrapper], appId.get))
+        } catch {
+          case _: NoSuchElementException => None
+        }
+        if (oldApp.isDefined) {
+          val oldAppAttemptList = oldApp.get.attempts.filter(_.info.attemptId == attemptId)
+          if (oldAppAttemptList.size == 1) {
+            appListingFromXAttr.applicationInfoFromDB(oldApp.get.info.id,
+              oldApp.get.info.name, oldAppAttemptList.last)
+          }
+        }
+      }
+      if (appStartInfo.isDefined) {
+        appListingFromXAttr.applicationStartFromXAttr(appStartInfo.get)
+      }
+      if (envUpdateInfo.isDefined) {
+        appListingFromXAttr.applicationEnvUpdateFromXAttr(envUpdateInfo.get)
+      }
+      if(appEndInfo.isDefined) {
+        appListingFromXAttr.applicationEndFromXAttr(appEndInfo.get)
+      }
+      appListingFromXAttr.applicationInfo match {
+        case Some(app) =>
+          invalidateUI(app.info.id, app.attempts.head.info.attemptId)
+          addListing(app)
+          listing.write(LogInfo(reader.rootPath.toString, scanTime, LogType.EventLogs,
+            Some(app.info.id), app.attempts.head.info.attemptId, reader.fileSizeForLastIndex,
+            reader.lastIndex, None, reader.completed, xAttrStatus))
+          if (isCompleted(reader.rootPath.toString)) {
+            removeInProgressEntry(reader.rootPath)
+          }
+        case _ =>
+          listing.write(LogInfo(reader.rootPath.toString, scanTime, LogType.EventLogs,
+            None, None, reader.fileSizeForLastIndex, reader.lastIndex, None,
+          reader.completed, xAttrStatus))
+      }
+    }
+    logInfo(s"Finished reading extended attributes for file : ${reader.rootPath}")
+    endProcessing(reader.rootPath)
+    pendingReplayTasksCount.decrementAndGet()
+  }

Review Comment:
   The one issue I see with this method (and so the approach) is that it seems to assume `writeToXAttr` at driver will succeed.
   For example, if xattr update for `USER_ATTEMPT_ACLS` fails at driver, we will never fallback to reading it from event file - even if the application has completed (same for the other states in this method).
   
   Thoughts on how to mitigate this ?



##########
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala:
##########
@@ -1580,53 +1854,53 @@ private[history] class AppListingListener(
     }
   }
 
-  private class MutableApplicationInfo {
-    var id: String = null
-    var name: String = null
+}
 
-    def toView(): ApplicationInfoWrapper = {
-      val apiInfo = ApplicationInfo(id, name, None, None, None, None, Nil)
-      new ApplicationInfoWrapper(apiInfo, List(attempt.toView()))
-    }
+private class MutableApplicationInfo {
+  var id: Option[String] = None

Review Comment:
   Why are we making this change (`id: String` -> `id: Option[String]`) ? Does not appear to be necessary.
   
   Note that if this is required, we have to make a bunch of other changes as well.
   For example, in `applicationInfo` above, we should be checking for `isDefined` on `id`, not non-`null`, etc.



##########
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala:
##########
@@ -1177,6 +1191,181 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     listing.read(classOf[ApplicationInfoWrapper], appId)
   }
 
+  private def updateAppInfoFromXAttrs(
+    info: LogInfo, reader: EventLogFileReader, scanTime: Long): Unit = {
+    var xAttrStatus: LogXAttrStatus.Value = info.logXAttrStatus
+    var appStartInfo: Option[Map[String, String]] = None
+    var envUpdateInfo: Option[String] = None
+    var appEndInfo: Option[String] = None
+    var appId = info.appId
+    var attemptId = info.attemptId
+
+    pendingReplayTasksCount.incrementAndGet()
+    /**
+     * XATTR_ENABLED means it is a newly detected file
+     * If we are able to successfully get the application start information,
+     * change the status to APP_STARTED
+    */
+    if (xAttrStatus == LogXAttrStatus.XATTR_ENABLED) {
+      appStartInfo = getXAttrs(reader.rootPath, EventLoggingListener.XATTRS_APPLICATION_START_LIST)
+      if(appStartInfo.isDefined) {
+        appId = Some(appStartInfo.get(EventLoggingListener.USER_APP_ID))
+        attemptId = Some(appStartInfo.get(EventLoggingListener.USER_ATTEMPT_ID))
+        xAttrStatus = LogXAttrStatus.APP_STARTED
+      }
+    }
+
+    /**
+     * Try to get application env update information
+     * If we are able to successfully get them, change the status to APP_ENV_UPDATED
+     */
+    if(xAttrStatus == LogXAttrStatus.APP_STARTED) {
+      envUpdateInfo = getXAttr(reader.rootPath, EventLoggingListener.USER_ATTEMPT_ACLS)
+      if(envUpdateInfo.isDefined) {
+        xAttrStatus = LogXAttrStatus.APP_ENV_UPDATED
+      }
+    }
+
+    /**
+     * Try to get the application attempt end time
+     * If we are able to successfully get them, change the status to APP_END
+     */
+    if (xAttrStatus == LogXAttrStatus.APP_ENV_UPDATED) {
+      appEndInfo = getXAttr(reader.rootPath, EventLoggingListener.USER_ATTEMPT_ENDTIME)
+      if (appEndInfo.isDefined) {
+        xAttrStatus = LogXAttrStatus.APP_END
+      }
+    }
+
+    // Check if status is updated
+    if(xAttrStatus != info.logXAttrStatus) {

Review Comment:
   This block also needs to handle the fact that appId might be updated, but attemptId might have failed (and so `None` after the suggested change above).
   
   Check validity of state and fail fast for this case (please check if anything else fits this situation - there are a bunch in `XATTRS_APPLICATION_START_LIST`).
   
   (Note - please see my comment below for more).



##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -251,6 +251,12 @@ package object config {
         "configured to be at least 10 MiB.")
       .createWithDefaultString("128m")
 
+  private[spark] val EVENT_LOG_USEXATTR =
+    ConfigBuilder("spark.eventLog.usexattr")
+      .version("3.5.0")

Review Comment:
   Change to `4.0.0`



##########
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala:
##########
@@ -1177,6 +1191,181 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     listing.read(classOf[ApplicationInfoWrapper], appId)
   }
 
+  private def updateAppInfoFromXAttrs(
+    info: LogInfo, reader: EventLogFileReader, scanTime: Long): Unit = {
+    var xAttrStatus: LogXAttrStatus.Value = info.logXAttrStatus
+    var appStartInfo: Option[Map[String, String]] = None
+    var envUpdateInfo: Option[String] = None
+    var appEndInfo: Option[String] = None
+    var appId = info.appId
+    var attemptId = info.attemptId
+
+    pendingReplayTasksCount.incrementAndGet()
+    /**
+     * XATTR_ENABLED means it is a newly detected file
+     * If we are able to successfully get the application start information,
+     * change the status to APP_STARTED
+    */
+    if (xAttrStatus == LogXAttrStatus.XATTR_ENABLED) {
+      appStartInfo = getXAttrs(reader.rootPath, EventLoggingListener.XATTRS_APPLICATION_START_LIST)
+      if(appStartInfo.isDefined) {
+        appId = Some(appStartInfo.get(EventLoggingListener.USER_APP_ID))
+        attemptId = Some(appStartInfo.get(EventLoggingListener.USER_ATTEMPT_ID))
+        xAttrStatus = LogXAttrStatus.APP_STARTED
+      }
+    }
+
+    /**
+     * Try to get application env update information
+     * If we are able to successfully get them, change the status to APP_ENV_UPDATED
+     */
+    if(xAttrStatus == LogXAttrStatus.APP_STARTED) {
+      envUpdateInfo = getXAttr(reader.rootPath, EventLoggingListener.USER_ATTEMPT_ACLS)
+      if(envUpdateInfo.isDefined) {
+        xAttrStatus = LogXAttrStatus.APP_ENV_UPDATED
+      }
+    }
+
+    /**
+     * Try to get the application attempt end time
+     * If we are able to successfully get them, change the status to APP_END
+     */
+    if (xAttrStatus == LogXAttrStatus.APP_ENV_UPDATED) {
+      appEndInfo = getXAttr(reader.rootPath, EventLoggingListener.USER_ATTEMPT_ENDTIME)
+      if (appEndInfo.isDefined) {
+        xAttrStatus = LogXAttrStatus.APP_END
+      }
+    }
+
+    // Check if status is updated
+    if(xAttrStatus != info.logXAttrStatus) {
+      val appListingFromXAttr = new AppListingEntryFromXAttr(reader, clock)
+      if (info.logXAttrStatus != LogXAttrStatus.XATTR_ENABLED && appId.isDefined) {
+        val oldApp: Option[ApplicationInfoWrapper] = try {
+          Some(listing.read(classOf[ApplicationInfoWrapper], appId.get))
+        } catch {
+          case _: NoSuchElementException => None
+        }
+        if (oldApp.isDefined) {
+          val oldAppAttemptList = oldApp.get.attempts.filter(_.info.attemptId == attemptId)
+          if (oldAppAttemptList.size == 1) {
+            appListingFromXAttr.applicationInfoFromDB(oldApp.get.info.id,
+              oldApp.get.info.name, oldAppAttemptList.last)
+          }
+        }
+      }
+      if (appStartInfo.isDefined) {
+        appListingFromXAttr.applicationStartFromXAttr(appStartInfo.get)
+      }
+      if (envUpdateInfo.isDefined) {
+        appListingFromXAttr.applicationEnvUpdateFromXAttr(envUpdateInfo.get)
+      }
+      if(appEndInfo.isDefined) {
+        appListingFromXAttr.applicationEndFromXAttr(appEndInfo.get)
+      }
+      appListingFromXAttr.applicationInfo match {
+        case Some(app) =>
+          invalidateUI(app.info.id, app.attempts.head.info.attemptId)
+          addListing(app)
+          listing.write(LogInfo(reader.rootPath.toString, scanTime, LogType.EventLogs,
+            Some(app.info.id), app.attempts.head.info.attemptId, reader.fileSizeForLastIndex,
+            reader.lastIndex, None, reader.completed, xAttrStatus))
+          if (isCompleted(reader.rootPath.toString)) {
+            removeInProgressEntry(reader.rootPath)
+          }
+        case _ =>
+          listing.write(LogInfo(reader.rootPath.toString, scanTime, LogType.EventLogs,
+            None, None, reader.fileSizeForLastIndex, reader.lastIndex, None,
+          reader.completed, xAttrStatus))
+      }
+    }
+    logInfo(s"Finished reading extended attributes for file : ${reader.rootPath}")
+    endProcessing(reader.rootPath)
+    pendingReplayTasksCount.decrementAndGet()
+  }
+
+  /**
+   * For a finished log, remove the corresponding "in progress" entry from the listing DB if
+   * the file is really gone.
+   * @param logPath in progress log file path to be removed
+   */
+  private def removeInProgressEntry(logPath: Path): Unit = {

Review Comment:
   Modify `doMergeApplicationListingInternal` to use this ? Or is it doing anything different there ?



##########
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala:
##########
@@ -1177,6 +1191,181 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     listing.read(classOf[ApplicationInfoWrapper], appId)
   }
 
+  private def updateAppInfoFromXAttrs(
+    info: LogInfo, reader: EventLogFileReader, scanTime: Long): Unit = {
+    var xAttrStatus: LogXAttrStatus.Value = info.logXAttrStatus
+    var appStartInfo: Option[Map[String, String]] = None
+    var envUpdateInfo: Option[String] = None
+    var appEndInfo: Option[String] = None
+    var appId = info.appId
+    var attemptId = info.attemptId
+
+    pendingReplayTasksCount.incrementAndGet()
+    /**
+     * XATTR_ENABLED means it is a newly detected file
+     * If we are able to successfully get the application start information,
+     * change the status to APP_STARTED
+    */
+    if (xAttrStatus == LogXAttrStatus.XATTR_ENABLED) {
+      appStartInfo = getXAttrs(reader.rootPath, EventLoggingListener.XATTRS_APPLICATION_START_LIST)
+      if(appStartInfo.isDefined) {
+        appId = Some(appStartInfo.get(EventLoggingListener.USER_APP_ID))
+        attemptId = Some(appStartInfo.get(EventLoggingListener.USER_ATTEMPT_ID))
+        xAttrStatus = LogXAttrStatus.APP_STARTED
+      }
+    }
+
+    /**
+     * Try to get application env update information
+     * If we are able to successfully get them, change the status to APP_ENV_UPDATED
+     */
+    if(xAttrStatus == LogXAttrStatus.APP_STARTED) {
+      envUpdateInfo = getXAttr(reader.rootPath, EventLoggingListener.USER_ATTEMPT_ACLS)
+      if(envUpdateInfo.isDefined) {
+        xAttrStatus = LogXAttrStatus.APP_ENV_UPDATED
+      }
+    }
+
+    /**
+     * Try to get the application attempt end time
+     * If we are able to successfully get them, change the status to APP_END
+     */
+    if (xAttrStatus == LogXAttrStatus.APP_ENV_UPDATED) {
+      appEndInfo = getXAttr(reader.rootPath, EventLoggingListener.USER_ATTEMPT_ENDTIME)
+      if (appEndInfo.isDefined) {
+        xAttrStatus = LogXAttrStatus.APP_END
+      }
+    }
+
+    // Check if status is updated
+    if(xAttrStatus != info.logXAttrStatus) {
+      val appListingFromXAttr = new AppListingEntryFromXAttr(reader, clock)
+      if (info.logXAttrStatus != LogXAttrStatus.XATTR_ENABLED && appId.isDefined) {
+        val oldApp: Option[ApplicationInfoWrapper] = try {
+          Some(listing.read(classOf[ApplicationInfoWrapper], appId.get))
+        } catch {
+          case _: NoSuchElementException => None
+        }
+        if (oldApp.isDefined) {
+          val oldAppAttemptList = oldApp.get.attempts.filter(_.info.attemptId == attemptId)
+          if (oldAppAttemptList.size == 1) {
+            appListingFromXAttr.applicationInfoFromDB(oldApp.get.info.id,
+              oldApp.get.info.name, oldAppAttemptList.last)
+          }
+        }
+      }
+      if (appStartInfo.isDefined) {
+        appListingFromXAttr.applicationStartFromXAttr(appStartInfo.get)
+      }
+      if (envUpdateInfo.isDefined) {
+        appListingFromXAttr.applicationEnvUpdateFromXAttr(envUpdateInfo.get)
+      }
+      if(appEndInfo.isDefined) {
+        appListingFromXAttr.applicationEndFromXAttr(appEndInfo.get)
+      }
+      appListingFromXAttr.applicationInfo match {
+        case Some(app) =>
+          invalidateUI(app.info.id, app.attempts.head.info.attemptId)
+          addListing(app)
+          listing.write(LogInfo(reader.rootPath.toString, scanTime, LogType.EventLogs,
+            Some(app.info.id), app.attempts.head.info.attemptId, reader.fileSizeForLastIndex,
+            reader.lastIndex, None, reader.completed, xAttrStatus))
+          if (isCompleted(reader.rootPath.toString)) {
+            removeInProgressEntry(reader.rootPath)
+          }
+        case _ =>
+          listing.write(LogInfo(reader.rootPath.toString, scanTime, LogType.EventLogs,
+            None, None, reader.fileSizeForLastIndex, reader.lastIndex, None,
+          reader.completed, xAttrStatus))
+      }
+    }
+    logInfo(s"Finished reading extended attributes for file : ${reader.rootPath}")
+    endProcessing(reader.rootPath)
+    pendingReplayTasksCount.decrementAndGet()
+  }
+
+  /**
+   * For a finished log, remove the corresponding "in progress" entry from the listing DB if
+   * the file is really gone.
+   * @param logPath in progress log file path to be removed
+   */
+  private def removeInProgressEntry(logPath: Path): Unit = {
+    if (isCompleted(logPath.getName())) {
+      val inProgressLog = logPath.toString() + EventLogFileWriter.IN_PROGRESS
+      try {
+        // Fetch the entry first to avoid an RPC when it's already removed.
+        listing.read(classOf[LogInfo], inProgressLog)
+        val fileStatus = fs.getFileStatus(new Path(inProgressLog))
+        if (!fileStatus.isFile) {
+          listing.delete(classOf[LogInfo], inProgressLog)
+        }
+      } catch {
+        case _: NoSuchElementException =>
+      }
+    }
+  }
+
+  /**
+   * @param name of the log file
+   * @return true if log file ends with inprogress
+   */
+  private def isCompleted(name: String): Boolean = {
+    !name.endsWith(EventLogFileWriter.IN_PROGRESS)
+  }
+
+  private def getXAttrs(path: Path, nameList: List[String]): Option[Map[String, String]] = {
+    try {
+      val valueMap = fs.getXAttrs(path, nameList.asJava)
+      if(valueMap != null) {
+        Some(valueMap.asScala.toMap.map(value => value._1 ->
+          new String(value._2, StandardCharsets.UTF_8)))
+      } else {
+        None
+      }
+    } catch {
+      case _: IOException =>
+        logWarning(s"Unable to get extended attributes ${nameList}")
+        None
+    }
+  }
+
+  /**
+   * Get the value of a single attribute from HDFS
+   * @param path path to HDFS
+   * @param name Key of extended attribute
+   * @return Value of extended attribute
+   */
+  private def getXAttr(path: Path, name: String): Option[String] = {
+      val valueMap = getXAttrs(path, List(name))
+      if (!valueMap.isEmpty) {
+        Some(valueMap.get(name))
+      } else {
+        None
+      }

Review Comment:
   ```suggestion
       getXAttrs(path, List(name)).flatMap(_.get(name))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org