You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/06/30 18:28:46 UTC

spark git commit: [SPARK-21223] Change fileToAppInfo in FsHistoryProvider to fix concurrent issue.

Repository: spark
Updated Branches:
  refs/heads/master 528c9281a -> 1fe08d62f


[SPARK-21223] Change fileToAppInfo in FsHistoryProvider to fix concurrent issue.

# What issue does this PR address ?
Jira:https://issues.apache.org/jira/browse/SPARK-21223
fix the Thread-safety issue in FsHistoryProvider
Currently, Spark HistoryServer use a HashMap named fileToAppInfo in class FsHistoryProvider to store the map of eventlog path and attemptInfo.
When use ThreadPool to Replay the log files in the list and merge the list of old applications with new ones, multi thread may update fileToAppInfo at the same time, which may cause Thread-safety issues, such as  falling into an infinite loop because of calling resize func of the hashtable.

Author: 曾林西 <ze...@meituan.com>

Closes #18430 from zenglinxi0615/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1fe08d62
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1fe08d62
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1fe08d62

Branch: refs/heads/master
Commit: 1fe08d62f022e12f2f0161af5d8f9eac51baf1b9
Parents: 528c928
Author: 曾林西 <ze...@meituan.com>
Authored: Fri Jun 30 19:28:43 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Jun 30 19:28:43 2017 +0100

----------------------------------------------------------------------
 .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1fe08d62/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
index d05ca14..b2a50bd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.history
 
 import java.io.{FileNotFoundException, IOException, OutputStream}
 import java.util.UUID
-import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, Executors, ExecutorService, Future, TimeUnit}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.mutable
@@ -122,7 +122,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
   @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
     = new mutable.LinkedHashMap()
 
-  val fileToAppInfo = new mutable.HashMap[Path, FsApplicationAttemptInfo]()
+  val fileToAppInfo = new ConcurrentHashMap[Path, FsApplicationAttemptInfo]()
 
   // List of application logs to be deleted by event log cleaner.
   private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
@@ -321,7 +321,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
       // scan for modified applications, replay and merge them
       val logInfos: Seq[FileStatus] = statusList
         .filter { entry =>
-          val prevFileSize = fileToAppInfo.get(entry.getPath()).map{_.fileSize}.getOrElse(0L)
+          val fileInfo = fileToAppInfo.get(entry.getPath())
+          val prevFileSize = if (fileInfo != null) fileInfo.fileSize else 0L
           !entry.isDirectory() &&
             // FsHistoryProvider generates a hidden file which can't be read.  Accidentally
             // reading a garbage file is safe, but we would log an error which can be scary to
@@ -475,7 +476,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
           fileStatus.getLen(),
           appListener.appSparkVersion.getOrElse("")
         )
-        fileToAppInfo(logPath) = attemptInfo
+        fileToAppInfo.put(logPath, attemptInfo)
         logDebug(s"Application log ${attemptInfo.logPath} loaded successfully: $attemptInfo")
         Some(attemptInfo)
       } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org