You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2016/04/21 13:58:20 UTC

spark git commit: [SPARK-13988][CORE] Make replaying event logs multi threaded in Histo…ry server to ensure a single large log does not block other logs from being rendered.

Repository: spark
Updated Branches:
  refs/heads/master 4ac6e75cd -> 6fdd0e32a


[SPARK-13988][CORE] Make replaying event logs multi threaded in Histo…ry server to ensure a single large log does not block other logs from being rendered.

## What changes were proposed in this pull request?
The patch makes event log processing multi threaded.

## How was this patch tested?
Existing tests pass, there is no new tests needed to test the functionality as this is a perf improvement. I tested the patch locally by generating one big event log (big1), one small event log(small1) and again a big event log(big2). Without this patch UI does not render any app for almost 30 seconds and then big2 and small1 appears. another 30 second delay and finally big1 also shows up in UI. With this change small1 shows up immediately and big1 and big2 comes up in 30 seconds. Locally it also displays them in the correct order in the UI.

Author: Parth Brahmbhatt <pb...@netflix.com>

Closes #11800 from Parth-Brahmbhatt/SPARK-13988.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6fdd0e32
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6fdd0e32
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6fdd0e32

Branch: refs/heads/master
Commit: 6fdd0e32a6c3fdce1f3f7e1f8d252af05c419f7b
Parents: 4ac6e75
Author: Parth Brahmbhatt <pb...@netflix.com>
Authored: Thu Apr 21 06:58:00 2016 -0500
Committer: Tom Graves <tg...@yahoo-inc.com>
Committed: Thu Apr 21 06:58:00 2016 -0500

----------------------------------------------------------------------
 .../deploy/history/FsHistoryProvider.scala      | 92 +++++++++++---------
 docs/monitoring.md                              |  7 ++
 2 files changed, 56 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6fdd0e32/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 2bd4a46..07cbcec 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -79,6 +79,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
 
   private val NOT_STARTED = "<Not Started>"
 
+  private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads"
+
   // Interval between safemode checks.
   private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
     "spark.history.fs.safemodeCheck.interval", "5s")
@@ -89,6 +91,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   // Interval between each cleaner checks for event logs to delete
   private val CLEAN_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.cleaner.interval", "1d")
 
+  // Number of threads used to replay event logs.
+  private val NUM_PROCESSING_THREADS = conf.getInt(SPARK_HISTORY_FS_NUM_REPLAY_THREADS,
+    Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
+
   private val logDir = conf.getOption("spark.history.fs.logDirectory")
     .map { d => Utils.resolveURI(d).toString }
     .getOrElse(DEFAULT_LOG_DIR)
@@ -129,11 +135,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   }
 
   /**
-   * An Executor to fetch and parse log files.
+   * Fixed size thread pool to fetch and parse log files.
    */
   private val replayExecutor: ExecutorService = {
     if (!conf.contains("spark.testing")) {
-      ThreadUtils.newDaemonSingleThreadExecutor("log-replay-executor")
+      ThreadUtils.newDaemonFixedThreadPool(NUM_PROCESSING_THREADS, "log-replay-executor")
     } else {
       MoreExecutors.sameThreadExecutor()
     }
@@ -297,10 +303,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       if (logInfos.nonEmpty) {
         logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
       }
-      logInfos.grouped(20)
-        .map { batch =>
+      logInfos.map { file =>
           replayExecutor.submit(new Runnable {
-            override def run(): Unit = mergeApplicationListing(batch)
+            override def run(): Unit = mergeApplicationListing(file)
           })
         }
         .foreach { task =>
@@ -385,9 +390,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   /**
    * Replay the log files in the list and merge the list of old applications with new ones
    */
-  private def mergeApplicationListing(logs: Seq[FileStatus]): Unit = {
-    val newAttempts = logs.flatMap { fileStatus =>
-      try {
+  private def mergeApplicationListing(fileStatus: FileStatus): Unit = {
+    val newAttempts = try {
         val bus = new ReplayListenerBus()
         val res = replay(fileStatus, bus)
         res match {
@@ -403,7 +407,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
             e)
           None
       }
-    }
 
     if (newAttempts.isEmpty) {
       return
@@ -413,45 +416,48 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
     // contains both the new app attempt, and those that were already loaded in the existing apps
     // map. If an attempt has been updated, it replaces the old attempt in the list.
     val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()
-    newAttempts.foreach { attempt =>
-      val appInfo = newAppMap.get(attempt.appId)
-        .orElse(applications.get(attempt.appId))
-        .map { app =>
-          val attempts =
-            app.attempts.filter(_.attemptId != attempt.attemptId).toList ++ List(attempt)
-          new FsApplicationHistoryInfo(attempt.appId, attempt.name,
-            attempts.sortWith(compareAttemptInfo))
-        }
-        .getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt)))
-      newAppMap(attempt.appId) = appInfo
-    }
 
-    // Merge the new app list with the existing one, maintaining the expected ordering (descending
-    // end time). Maintaining the order is important to avoid having to sort the list every time
-    // there is a request for the log list.
-    val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
-    val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
-    def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
-      if (!mergedApps.contains(info.id)) {
-        mergedApps += (info.id -> info)
+    applications.synchronized {
+      newAttempts.foreach { attempt =>
+        val appInfo = newAppMap.get(attempt.appId)
+          .orElse(applications.get(attempt.appId))
+          .map { app =>
+            val attempts =
+              app.attempts.filter(_.attemptId != attempt.attemptId) ++ List(attempt)
+            new FsApplicationHistoryInfo(attempt.appId, attempt.name,
+              attempts.sortWith(compareAttemptInfo))
+          }
+          .getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt)))
+        newAppMap(attempt.appId) = appInfo
       }
-    }
 
-    val newIterator = newApps.iterator.buffered
-    val oldIterator = applications.values.iterator.buffered
-    while (newIterator.hasNext && oldIterator.hasNext) {
-      if (newAppMap.contains(oldIterator.head.id)) {
-        oldIterator.next()
-      } else if (compareAppInfo(newIterator.head, oldIterator.head)) {
-        addIfAbsent(newIterator.next())
-      } else {
-        addIfAbsent(oldIterator.next())
+      // Merge the new app list with the existing one, maintaining the expected ordering (descending
+      // end time). Maintaining the order is important to avoid having to sort the list every time
+      // there is a request for the log list.
+      val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
+      val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
+      def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
+        if (!mergedApps.contains(info.id)) {
+          mergedApps += (info.id -> info)
+        }
       }
-    }
-    newIterator.foreach(addIfAbsent)
-    oldIterator.foreach(addIfAbsent)
 
-    applications = mergedApps
+      val newIterator = newApps.iterator.buffered
+      val oldIterator = applications.values.iterator.buffered
+      while (newIterator.hasNext && oldIterator.hasNext) {
+        if (newAppMap.contains(oldIterator.head.id)) {
+          oldIterator.next()
+        } else if (compareAppInfo(newIterator.head, oldIterator.head)) {
+          addIfAbsent(newIterator.next())
+        } else {
+          addIfAbsent(oldIterator.next())
+        }
+      }
+      newIterator.foreach(addIfAbsent)
+      oldIterator.foreach(addIfAbsent)
+
+      applications = mergedApps
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/6fdd0e32/docs/monitoring.md
----------------------------------------------------------------------
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 32d2e02..9dcb070 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -189,6 +189,13 @@ The history server can be configured as follows:
       Job history files older than this will be deleted when the filesystem history cleaner runs.
     </td>
   </tr>
+  <tr>
+    <td>spark.history.fs.numReplayThreads</td>
+    <td>25% of available cores</td>
+    <td>
+      Number of threads that will be used by history server to process event logs.
+    </td>
+  </tr>
 </table>
 
 Note that in all of these UIs, the tables are sortable by clicking their headers,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org