You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/02/04 11:09:36 UTC

[GitHub] attilapiros commented on a change in pull request #23614: [SPARK-26689][CORE]Support blacklisting bad disk directory and retry in DiskBlockManager

attilapiros commented on a change in pull request #23614: [SPARK-26689][CORE]Support blacklisting bad disk directory and retry in DiskBlockManager
URL: https://github.com/apache/spark/pull/23614#discussion_r253428199
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
 ##########
 @@ -48,32 +55,71 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
   // of subDirs(i) is protected by the lock of subDirs(i)
   private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
 
+  private[spark] val badDirs = ArrayBuffer[File]()
+  private[spark] val dirToBlacklistExpiryTime = new HashMap[File, Long]
+  // Filename hash to dirId, it should be small enough to put into memory
+  private[spark] val migratedDirIdIndex = new ConcurrentHashMap[Int, Int].asScala
+
   private val shutdownHook = addShutdownHook()
 
   /** Looks up a file by hashing it into one of our local subdirectories. */
   // This method should be kept in sync with
   // org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
   def getFile(filename: String): File = {
+    var mostRecentFailure: Exception = null
     // Figure out which local directory it hashes to, and which subdirectory in that
     val hash = Utils.nonNegativeHash(filename)
-    val dirId = hash % localDirs.length
+    val dirId = migratedDirIdIndex.getOrElse(hash, hash % localDirs.length)
     val subDirId = (hash / localDirs.length) % subDirsPerLocalDir
 
     // Create the subdirectory if it doesn't already exist
     val subDir = subDirs(dirId).synchronized {
+      // Update blacklist
+      val now = clock.getTimeMillis()
+      val unblacklisted = badDirs.filter(now >= dirToBlacklistExpiryTime(_))
+      unblacklisted.foreach { dir =>
+        badDirs -= dir
+        dirToBlacklistExpiryTime.remove(dir)
+      }
+
       val old = subDirs(dirId)(subDirId)
       if (old != null) {
         old
       } else {
-        val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
-        if (!newDir.exists() && !newDir.mkdir()) {
-          throw new IOException(s"Failed to create local dir in $newDir.")
+        assert(!migratedDirIdIndex.contains(dirId))
+        var succeed = false
+        var newDir: File = null
+        for (attempt <- 0 until maxRetries if !succeed) {
+          val isBlacklisted = badDirs.contains(localDirs(dirId))
+          val goodDirId = if (isBlacklisted) {
+            localDirs.indexWhere(!badDirs.contains(_))
+          } else {
+            dirId
+          }
+          try {
+            if (goodDirId < 0) {
+              throw new IOException("No good disk directories available")
+            }
+            newDir = new File(localDirs(goodDirId), "%02x".format(subDirId))
+            if (!newDir.exists() && !newDir.mkdir()) {
+              throw new IOException(s"Failed to create local dir in $newDir.")
+            }
+            subDirs(goodDirId)(subDirId) = newDir
+            if (goodDirId != dirId) {
+              migratedDirIdIndex.put(hash, goodDirId)
+            }
+            succeed = true
+          } catch {
+            case e: IOException =>
+              logError(s"Failed to looking up file $filename in attempt $attempt", e)
+              badDirs += localDirs(dirId)
 
 Review comment:
   @liupc race condition still possible if two `getFile` calls are using different `dirId` (so the locks are different) and both directories are corrupt  

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org