You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2020/07/08 13:04:15 UTC

[spark] branch branch-3.0 updated: [SPARK-32024][WEBUI] Update ApplicationStoreInfo.size during HistoryServerDiskManager initializing

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new ac2c6cd3 [SPARK-32024][WEBUI] Update ApplicationStoreInfo.size during HistoryServerDiskManager initializing
ac2c6cd3 is described below

commit ac2c6cd3cfae369f6d1af6abea567263d34a29b2
Author: Zhen Li <zh...@microsoft.com>
AuthorDate: Wed Jul 8 21:58:45 2020 +0900

    [SPARK-32024][WEBUI] Update ApplicationStoreInfo.size during HistoryServerDiskManager initializing
    
    ### What changes were proposed in this pull request?
    
    Update ApplicationStoreInfo.size to real size during HistoryServerDiskManager initializing.
    
    ### Why are the changes needed?
    
    This PR is for fixing bug [32024](https://issues.apache.org/jira/browse/SPARK-32024). We found after history server restart, below error would randomly happen: "java.lang.IllegalStateException: Disk usage tracker went negative (now = -***, delta = -***)" from `HistoryServerDiskManager`.
    ![Capture](https://user-images.githubusercontent.com/10524738/85034468-fda4ae80-b136-11ea-9011-f0c3e6508002.JPG)
    
    **Cause**: Reading data from level db would trigger table file compaction, which may also trigger size of level db directory changes.  This size change may not be recorded in LevelDB (`ApplicationStoreInfo` in `listing`). When service restarts, `currentUsage` is calculated from real directory size, but `ApplicationStoreInfo` are loaded from leveldb, then `currentUsage` may be less then sum of `ApplicationStoreInfo.size`. In `makeRoom()` function, `ApplicationStoreInfo.size` is used to [...]
    **Reproduce**: we can reproduce this issue in dev environment by reducing config value of "spark.history.retainedApplications" and "spark.history.store.maxDiskUsage" to some small values. Here are steps: 1. start history server, load some applications and access some pages (maybe "stages" page to trigger leveldb compaction). 2. restart HS, and refresh pages.
    I also added an UT to simulate this case in `HistoryServerDiskManagerSuite`.
    **Benefit**: this change would help improve history server reliability.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Add unit test and manually tested it.
    
    Closes #28859 from zhli1142015/update-ApplicationStoreInfo.size-during-disk-manager-initialize.
    
    Authored-by: Zhen Li <zh...@microsoft.com>
    Signed-off-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    (cherry picked from commit 8e7fc04637bbb8d2fdc2c758746e0eaf496c4d92)
    Signed-off-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
---
 .../deploy/history/HistoryServerDiskManager.scala  | 21 ++++++++--
 .../history/HistoryServerDiskManagerSuite.scala    | 46 ++++++++++++++++++++++
 2 files changed, 64 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
index 0a1f333..4ddd6d9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
@@ -75,14 +75,29 @@ private class HistoryServerDiskManager(
 
     // Go through the recorded store directories and remove any that may have been removed by
     // external code.
-    val orphans = listing.view(classOf[ApplicationStoreInfo]).asScala.filter { info =>
-      !new File(info.path).exists()
-    }.toSeq
+    val (existences, orphans) = listing
+      .view(classOf[ApplicationStoreInfo])
+      .asScala
+      .toSeq
+      .partition { info =>
+        new File(info.path).exists()
+      }
 
     orphans.foreach { info =>
       listing.delete(info.getClass(), info.path)
     }
 
+    // Reading level db would trigger table file compaction, then it may cause size of level db
+    // directory changed. When service restarts, "currentUsage" is calculated from real directory
+    // size. Update "ApplicationStoreInfo.size" to ensure "currentUsage" equals
+    // sum of "ApplicationStoreInfo.size".
+    existences.foreach { info =>
+      val fileSize = sizeOf(new File(info.path))
+      if (fileSize != info.size) {
+        listing.write(info.copy(size = fileSize))
+      }
+    }
+
     logInfo("Initialized disk manager: " +
       s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
       s"max usage = ${Utils.bytesToString(maxUsage)}")
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
index f78469e..b17880a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
@@ -158,4 +158,50 @@ class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter {
     assert(manager.approximateSize(50L, true) > 50L)
   }
 
+  test("SPARK-32024: update ApplicationStoreInfo.size during initializing") {
+    val manager = mockManager()
+    val leaseA = manager.lease(2)
+    doReturn(3L).when(manager).sizeOf(meq(leaseA.tmpPath))
+    val dstA = leaseA.commit("app1", None)
+    assert(manager.free() === 0)
+    assert(manager.committed() === 3)
+    // Listing store tracks dstA now.
+    assert(store.read(classOf[ApplicationStoreInfo], dstA.getAbsolutePath).size === 3)
+
+    // Simulate: service restarts, new disk manager (manager1) is initialized.
+    val manager1 = mockManager()
+    // Simulate: event KVstore compaction before restart, directory size reduces.
+    doReturn(2L).when(manager1).sizeOf(meq(dstA))
+    doReturn(2L).when(manager1).sizeOf(meq(new File(testDir, "apps")))
+    manager1.initialize()
+    // "ApplicationStoreInfo.size" is updated for dstA.
+    assert(store.read(classOf[ApplicationStoreInfo], dstA.getAbsolutePath).size === 2)
+    assert(manager1.free() === 1)
+    // If "ApplicationStoreInfo.size" is not correctly updated, "IllegalStateException"
+    // would be thrown.
+    val leaseB = manager1.lease(2)
+    assert(manager1.free() === 1)
+    doReturn(2L).when(manager1).sizeOf(meq(leaseB.tmpPath))
+    val dstB = leaseB.commit("app2", None)
+    assert(manager1.committed() === 2)
+    // Listing store tracks dstB only, dstA is evicted by "makeRoom()".
+    assert(store.read(classOf[ApplicationStoreInfo], dstB.getAbsolutePath).size === 2)
+
+    val manager2 = mockManager()
+    // Simulate: cache entities are written after replaying, directory size increases.
+    doReturn(3L).when(manager2).sizeOf(meq(dstB))
+    doReturn(3L).when(manager2).sizeOf(meq(new File(testDir, "apps")))
+    manager2.initialize()
+    // "ApplicationStoreInfo.size" is updated for dstB.
+    assert(store.read(classOf[ApplicationStoreInfo], dstB.getAbsolutePath).size === 3)
+    assert(manager2.free() === 0)
+    val leaseC = manager2.lease(2)
+    doReturn(2L).when(manager2).sizeOf(meq(leaseC.tmpPath))
+    val dstC = leaseC.commit("app3", None)
+    assert(manager2.free() === 1)
+    assert(manager2.committed() === 2)
+    // Listing store tracks dstC only, dstB is evicted by "makeRoom()".
+    assert(store.read(classOf[ApplicationStoreInfo], dstC.getAbsolutePath).size === 2)
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org