You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/12/18 23:24:00 UTC

[spark] branch branch-3.0 updated: [SPARK-33841][CORE][3.0] Fix issue with jobs disappearing intermittently from the SHS under high load

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 7881622  [SPARK-33841][CORE][3.0] Fix issue with jobs disappearing intermittently from the SHS under high load
7881622 is described below

commit 7881622899082f73b99d9e92f6b08979005ba4df
Author: Vlad Glinsky <vl...@gmail.com>
AuthorDate: Fri Dec 18 15:19:09 2020 -0800

    [SPARK-33841][CORE][3.0] Fix issue with jobs disappearing intermittently from the SHS under high load
    
    ### What changes were proposed in this pull request?
    
    Mark SHS event log entries that were `processing` at the beginning of the `checkForLogs` run as not stale and check for this mark before deleting an event log. This fixes the issue when a particular job was displayed in the SHS and disappeared after some time, but then, in several minutes showed up again.
    
    ### Why are the changes needed?
    
    The issue is caused by [SPARK-29043](https://issues.apache.org/jira/browse/SPARK-29043), which is designated to improve the concurrent performance of the History Server. The [change](https://github.com/apache/spark/pull/25797/files#) breaks the ["app deletion" logic](https://github.com/apache/spark/pull/25797/files#diff-128a6af0d78f4a6180774faedb335d6168dfc4defff58f5aa3021fc1bd767bc0R563) because of missing proper synchronization for `processing` event log entries. Since SHS now [filt [...]
    
    The issue can be reproduced by generating a big number of event logs and uploading them to the SHS event log directory on S3. Essentially, around 800(82.6 MB) copies of an event log file were created using [shs-monitor](https://github.com/vladhlinsky/shs-monitor) script. Strange behavior of SHS counting the total number of applications was noticed - at first, the number was increasing as expected, but with the next page refresh, the total number of applications decreased. No errors we [...]
    
    241 entities are displayed at `20:50:42`:
    ![1-241-entities-at-20-50](https://user-images.githubusercontent.com/61428392/102611539-c2138d00-4137-11eb-9bbd-d77b22041f3b.png)
    203 entities are displayed at `20:52:17`:
    ![2-203-entities-at-20-52](https://user-images.githubusercontent.com/61428392/102611561-cdff4f00-4137-11eb-91ed-7405fe58a695.png)
    The number of loaded applications over time:
    ![4-loaded-applications](https://user-images.githubusercontent.com/61428392/102611586-d8b9e400-4137-11eb-8747-4007fc5469de.png)
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, SHS users won't face the behavior when the number of displayed applications decreases periodically.
    
    ### How was this patch tested?
    
    Tested using [shs-monitor](https://github.com/vladhlinsky/shs-monitor) script:
    * Build SHS with the proposed change
    * Download Hadoop AWS and AWS Java SDK
    * Prepare S3 bucket and user for programmatic access, grant required roles to the user. Get access key and secret key
    * Configure SHS to read event logs from S3
    * Start [monitor](https://github.com/vladhlinsky/shs-monitor/blob/main/monitor.sh) script to query SHS API
    * Run 8 [producers](https://github.com/vladhlinsky/shs-monitor/blob/main/producer.sh) for ~10 mins, create 805(83.1 MB) event log copies
    * Wait for SHS to load all the applications
    * Verify that the number of loaded applications increases continuously over time
    ![5-loaded-applications-fixed](https://user-images.githubusercontent.com/61428392/102617363-bf1d9a00-4141-11eb-9bae-f982d02fd30f.png)
    
    For more details, please refer to the [shs-monitor](https://github.com/vladhlinsky/shs-monitor) repository.
    
    Closes #30842 from vladhlinsky/SPARK-33841-branch-3.0.
    
    Authored-by: Vlad Glinsky <vl...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/deploy/history/FsHistoryProvider.scala   | 26 +++++++++++++++++-----
 1 file changed, 20 insertions(+), 6 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index 7e63d55..b31333f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -460,9 +460,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       val newLastScanTime = clock.getTimeMillis()
       logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
 
+      // Mark entries that are processing as not stale. Such entries do not have a chance to be
+      // updated with the new 'lastProcessed' time and thus any entity that completes processing
+      // right after this check and before the check for stale entities will be identified as stale
+      // and will be deleted from the UI until the next 'checkForLogs' run.
+      val notStale = mutable.HashSet[String]()
       val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil)
         .filter { entry => !isBlacklisted(entry.getPath) }
-        .filter { entry => !isProcessing(entry.getPath) }
+        .filter { entry =>
+          if (isProcessing(entry.getPath)) {
+            notStale.add(entry.getPath.toString())
+            false
+          } else {
+            true
+          }
+        }
         .flatMap { entry => EventLogFileReader(fs, entry) }
         .filter { reader =>
           try {
@@ -562,12 +574,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
         .last(newLastScanTime - 1)
         .asScala
         .toList
-      stale.filterNot(isProcessing).foreach { log =>
-        log.appId.foreach { appId =>
-          cleanAppData(appId, log.attemptId, log.logPath)
-          listing.delete(classOf[LogInfo], log.logPath)
+      stale.filterNot(isProcessing)
+        .filterNot(info => notStale.contains(info.logPath))
+        .foreach { log =>
+          log.appId.foreach { appId =>
+            cleanAppData(appId, log.attemptId, log.logPath)
+            listing.delete(classOf[LogInfo], log.logPath)
+          }
         }
-      }
 
       lastScanTime.set(newLastScanTime)
     } catch {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org