You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/11/14 11:15:33 UTC

spark git commit: [SPARK-18010][CORE] Reduce work performed for building up the application list for the History Server app list UI page

Repository: spark
Updated Branches:
  refs/heads/branch-2.0 646cc8552 -> 26ae5cfa7


[SPARK-18010][CORE] Reduce work performed for building up the application list for the History Server app list UI page

## What changes were proposed in this pull request?

backport of pull request #15556

allow ReplayListenerBus to skip deserialising and replaying certain events using an inexpensive check of the event log entry. Use this to ensure that when event log replay is triggered for building the application list, we get the ReplayListenerBus to skip over all but the few events needed for our immediate purpose. Refer [SPARK-18010] for the motivation behind this change.
## How was this patch tested?

Tested with existing HistoryServer and ReplayListener unit test suites. All tests pass.

Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request.

Author: Vinayak vijoshi5in.ibm.com

Closes #15556 from vijoshi/SAAS-467_master.

Author: Vinayak <vi...@in.ibm.com>

Closes #15655 from vijoshi/SAAS-467_branch-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26ae5cfa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26ae5cfa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26ae5cfa

Branch: refs/heads/branch-2.0
Commit: 26ae5cfa7592cd84b5cc3361dfb0951ffa7409a9
Parents: 646cc85
Author: Vinayak <vi...@in.ibm.com>
Authored: Mon Nov 14 12:15:27 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Nov 14 12:15:27 2016 +0100

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 120 +++++++++++--------
 .../spark/scheduler/ReplayListenerBus.scala     |  35 ++++--
 2 files changed, 97 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/26ae5cfa/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index cf4a401..a1e36c5 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -35,6 +35,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.ReplayListenerBus._
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 
@@ -77,10 +78,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
 
   import FsHistoryProvider._
 
-  private val NOT_STARTED = "<Not Started>"
-
-  private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads"
-
   // Interval between safemode checks.
   private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
     "spark.history.fs.safemodeCheck.interval", "5s")
@@ -238,11 +235,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
               HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime)
             // Do not call ui.bind() to avoid creating a new server for each application
           }
-          val appListener = new ApplicationEventListener()
-          replayBus.addListener(appListener)
-          val appAttemptInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)),
-            replayBus)
-          appAttemptInfo.map { info =>
+
+          val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
+
+          val appListener = replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
+
+          if (appListener.appId.isDefined) {
             val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
             ui.getSecurityManager.setAcls(uiAclsEnabled)
             // make sure to set admin acls before view acls so they are properly picked up
@@ -251,8 +249,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
               appListener.viewAcls.getOrElse(""))
             ui.getSecurityManager.setAdminAclsGroups(appListener.adminAclsGroups.getOrElse(""))
             ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse(""))
-            LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize))
+            Some(LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize)))
+          } else {
+            None
           }
+
         }
       }
     } catch {
@@ -392,28 +393,54 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     }
   }
 
-
   /**
    * Replay the log files in the list and merge the list of old applications with new ones
    */
   private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
     val newAttempts = try {
-        val bus = new ReplayListenerBus()
-        val res = replay(fileStatus, bus)
-        res match {
-          case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully: $r")
-          case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
-            "The application may have not started.")
-        }
-        res
-      } catch {
-        case e: Exception =>
-          logError(
-            s"Exception encountered when attempting to load application log ${fileStatus.getPath}",
-            e)
-          None
+      val eventsFilter: ReplayEventsFilter = { eventString =>
+        eventString.startsWith(APPL_START_EVENT_PREFIX) ||
+          eventString.startsWith(APPL_END_EVENT_PREFIX)
+      }
+
+      val logPath = fileStatus.getPath()
+
+      val appCompleted = isApplicationCompleted(fileStatus)
+
+      val appListener = replay(fileStatus, appCompleted, new ReplayListenerBus(), eventsFilter)
+
+      // Without an app ID, new logs will render incorrectly in the listing page, so do not list or
+      // try to show their UI.
+      if (appListener.appId.isDefined) {
+        val attemptInfo = new FsApplicationAttemptInfo(
+          logPath.getName(),
+          appListener.appName.getOrElse(NOT_STARTED),
+          appListener.appId.getOrElse(logPath.getName()),
+          appListener.appAttemptId,
+          appListener.startTime.getOrElse(-1L),
+          appListener.endTime.getOrElse(-1L),
+          fileStatus.getModificationTime(),
+          appListener.sparkUser.getOrElse(NOT_STARTED),
+          appCompleted,
+          fileStatus.getLen()
+        )
+        fileToAppInfo(logPath) = attemptInfo
+        logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo")
+        Some(attemptInfo)
+      } else {
+        logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
+          "The application may have not started.")
+        None
       }
 
+    } catch {
+      case e: Exception =>
+        logError(
+          s"Exception encountered when attempting to load application log ${fileStatus.getPath}",
+          e)
+        None
+    }
+
     if (newAttempts.isEmpty) {
       return
     }
@@ -550,12 +577,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   }
 
   /**
-   * Replays the events in the specified log file and returns information about the associated
-   * application. Return `None` if the application ID cannot be located.
+   * Replays the events in the specified log file on the supplied `ReplayListenerBus`. Returns
+   * an `ApplicationEventListener` instance with event data captured from the replay.
+   * `ReplayEventsFilter` determines what events are replayed and can therefore limit the
+   * data captured in the returned `ApplicationEventListener` instance.
    */
   private def replay(
       eventLog: FileStatus,
-      bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
+      appCompleted: Boolean,
+      bus: ReplayListenerBus,
+      eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): ApplicationEventListener = {
     val logPath = eventLog.getPath()
     logInfo(s"Replaying log path: $logPath")
     // Note that the eventLog may have *increased* in size since when we grabbed the filestatus,
@@ -567,30 +598,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     val logInput = EventLoggingListener.openEventLog(logPath, fs)
     try {
       val appListener = new ApplicationEventListener
-      val appCompleted = isApplicationCompleted(eventLog)
       bus.addListener(appListener)
-      bus.replay(logInput, logPath.toString, !appCompleted)
-
-      // Without an app ID, new logs will render incorrectly in the listing page, so do not list or
-      // try to show their UI.
-      if (appListener.appId.isDefined) {
-        val attemptInfo = new FsApplicationAttemptInfo(
-          logPath.getName(),
-          appListener.appName.getOrElse(NOT_STARTED),
-          appListener.appId.getOrElse(logPath.getName()),
-          appListener.appAttemptId,
-          appListener.startTime.getOrElse(-1L),
-          appListener.endTime.getOrElse(-1L),
-          eventLog.getModificationTime(),
-          appListener.sparkUser.getOrElse(NOT_STARTED),
-          appCompleted,
-          eventLog.getLen()
-        )
-        fileToAppInfo(logPath) = attemptInfo
-        Some(attemptInfo)
-      } else {
-        None
-      }
+      bus.replay(logInput, logPath.toString, !appCompleted, eventsFilter)
+      appListener
     } finally {
       logInput.close()
     }
@@ -675,6 +685,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
 
 private[history] object FsHistoryProvider {
   val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
+
+  private val NOT_STARTED = "<Not Started>"
+
+  private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads"
+
+  private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\""
+
+  private val APPL_END_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationEnd\""
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/26ae5cfa/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index c65e7a2..2424586 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -44,18 +44,32 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
    * @param sourceName Filename (or other source identifier) from whence @logData is being read
    * @param maybeTruncated Indicate whether log file might be truncated (some abnormal situations
    *        encountered, log file might not finished writing) or not
+   * @param eventsFilter Filter function to select JSON event strings in the log data stream that
+   *        should be parsed and replayed. When not specified, all event strings in the log data
+   *        are parsed and replayed.
    */
   def replay(
       logData: InputStream,
       sourceName: String,
-      maybeTruncated: Boolean = false): Unit = {
+      maybeTruncated: Boolean = false,
+      eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
+
     var currentLine: String = null
-    var lineNumber: Int = 1
+    var lineNumber: Int = 0
+
     try {
-      val lines = Source.fromInputStream(logData).getLines()
-      while (lines.hasNext) {
-        currentLine = lines.next()
+      val lineEntries = Source.fromInputStream(logData)
+        .getLines()
+        .zipWithIndex
+        .filter { case (line, _) => eventsFilter(line) }
+
+      while (lineEntries.hasNext) {
         try {
+          val entry = lineEntries.next()
+
+          currentLine = entry._1
+          lineNumber = entry._2 + 1
+
           postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
         } catch {
           case e: ClassNotFoundException if KNOWN_REMOVED_CLASSES.contains(e.getMessage) =>
@@ -64,14 +78,15 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
             logWarning(s"Dropped incompatible Structured Streaming log: $currentLine")
           case jpe: JsonParseException =>
             // We can only ignore exception from last line of the file that might be truncated
-            if (!maybeTruncated || lines.hasNext) {
+            // the last entry may not be the very last line in the event log, but we treat it
+            // as such in a best effort to replay the given input
+            if (!maybeTruncated || lineEntries.hasNext) {
               throw jpe
             } else {
               logWarning(s"Got JsonParseException from log file $sourceName" +
                 s" at line $lineNumber, the file might not have finished writing cleanly.")
             }
         }
-        lineNumber += 1
       }
     } catch {
       case ioe: IOException =>
@@ -84,8 +99,14 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
 
 }
 
+
 private[spark] object ReplayListenerBus {
 
+  type ReplayEventsFilter = (String) => Boolean
+
+  // utility filter that selects all event logs during replay
+  val SELECT_ALL_FILTER: ReplayEventsFilter = { (eventString: String) => true }
+
   /**
    * Classes that were removed. Structured Streaming doesn't use them any more. However, parsing
    * old json may fail and we can just ignore these failures.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org