You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/11/06 00:34:20 UTC

[spark] branch branch-3.2 updated: [SPARK-36998][CORE] Handle concurrent eviction of same application in SHS

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 248e07b  [SPARK-36998][CORE] Handle concurrent eviction of same application in SHS
248e07b is described below

commit 248e07b49187bc7082e6cb2b0d9daa4b48ffe3cb
Author: Thejdeep Gudivada <tg...@linkedin.com>
AuthorDate: Fri Nov 5 17:32:32 2021 -0700

    [SPARK-36998][CORE] Handle concurrent eviction of same application in SHS
    
    ### What changes were proposed in this pull request?
     To gracefully handle the error thrown when we try to make room for parsing of different applications and they try to evict the same application by deleting the directory path.
    
    Also, added a test for `deleteStore` in `HistoryServerDiskManagerSuite`
    
     ### Why are the changes needed?
     Otherwise, an NoSuchFileException is thrown when it cannot find the directory path to exist.
    
     ### Does this PR introduce _any_ user-facing change?
     no
    
     ### How was this patch tested?
    Added a unit test for `deleteStore` but not specifically testing the concurrency fix.
    
    Closes #34276 from thejdeep/SPARK-36998.
    
    Authored-by: Thejdeep Gudivada <tg...@linkedin.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 39ad0d782cd708462d9dbd870fd23d7bb7c091a3)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../deploy/history/HistoryServerDiskManager.scala  | 10 ++++++--
 .../history/HistoryServerDiskManagerSuite.scala    | 30 +++++++++++++++++++---
 2 files changed, 35 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
index 31f9d18..8a5b285 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy.history
 
-import java.io.File
+import java.io.{File, IOException}
 import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.JavaConverters._
@@ -210,7 +210,13 @@ private class HistoryServerDiskManager(
   def committed(): Long = committedUsage.get()
 
   private def deleteStore(path: File): Unit = {
-    FileUtils.deleteDirectory(path)
+    try {
+      FileUtils.deleteDirectory(path)
+    } catch {
+      // Handle simultaneous eviction of the same app
+      case e: IOException =>
+        if (path.exists()) throw e
+    }
     listing.delete(classOf[ApplicationStoreInfo], path.getAbsolutePath())
   }
 
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
index 9004e86..fecf905 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
@@ -21,8 +21,9 @@ import java.io.File
 
 import org.mockito.AdditionalAnswers
 import org.mockito.ArgumentMatchers.{anyBoolean, anyLong, eq => meq}
-import org.mockito.Mockito.{doAnswer, spy}
-import org.scalatest.BeforeAndAfter
+import org.mockito.Mockito.{doAnswer, spy, when}
+import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
+import org.scalatestplus.mockito.MockitoSugar.mock
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.internal.config.History._
@@ -30,7 +31,8 @@ import org.apache.spark.status.KVUtils
 import org.apache.spark.util.{ManualClock, Utils}
 import org.apache.spark.util.kvstore.KVStore
 
-class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter {
+class HistoryServerDiskManagerSuite extends SparkFunSuite
+  with PrivateMethodTester with BeforeAndAfter {
 
   private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*)
 
@@ -158,6 +160,28 @@ class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter {
     assert(manager.approximateSize(50L, true) > 50L)
   }
 
+  test("SPARK-36998: Should be able to delete a store") {
+    val manager = mockManager()
+    val tempDir = Utils.createTempDir()
+    tempDir.delete()
+    Seq(true, false).foreach { exists =>
+      val file = mock[File]
+      when(file.exists()).thenReturn(true).thenReturn(true).thenReturn(exists)
+      when(file.isDirectory).thenReturn(true)
+      when(file.toPath).thenReturn(tempDir.toPath)
+      when(file.getAbsolutePath).thenReturn(tempDir.getAbsolutePath)
+      val deleteStore = PrivateMethod[Unit]('deleteStore)
+      if (exists) {
+        val m = intercept[Exception] {
+          manager invokePrivate deleteStore(file)
+        }.getMessage
+        assert(m.contains("Unknown I/O error"))
+      } else {
+        manager invokePrivate deleteStore(file)
+      }
+    }
+  }
+
   test("SPARK-32024: update ApplicationStoreInfo.size during initializing") {
     val manager = mockManager()
     val leaseA = manager.lease(2)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org