You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2020/07/08 13:04:15 UTC
[spark] branch branch-3.0 updated: [SPARK-32024][WEBUI] Update
ApplicationStoreInfo.size during HistoryServerDiskManager initializing
This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new ac2c6cd3 [SPARK-32024][WEBUI] Update ApplicationStoreInfo.size during HistoryServerDiskManager initializing
ac2c6cd3 is described below
commit ac2c6cd3cfae369f6d1af6abea567263d34a29b2
Author: Zhen Li <zh...@microsoft.com>
AuthorDate: Wed Jul 8 21:58:45 2020 +0900
[SPARK-32024][WEBUI] Update ApplicationStoreInfo.size during HistoryServerDiskManager initializing
### What changes were proposed in this pull request?
Update ApplicationStoreInfo.size to real size during HistoryServerDiskManager initializing.
### Why are the changes needed?
This PR is for fixing bug [32024](https://issues.apache.org/jira/browse/SPARK-32024). We found after history server restart, below error would randomly happen: "java.lang.IllegalStateException: Disk usage tracker went negative (now = -***, delta = -***)" from `HistoryServerDiskManager`.
![Capture](https://user-images.githubusercontent.com/10524738/85034468-fda4ae80-b136-11ea-9011-f0c3e6508002.JPG)
**Cause**: Reading data from level db would trigger table file compaction, which may also trigger size of level db directory changes. This size change may not be recorded in LevelDB (`ApplicationStoreInfo` in `listing`). When service restarts, `currentUsage` is calculated from real directory size, but `ApplicationStoreInfo` are loaded from leveldb, then `currentUsage` may be less then sum of `ApplicationStoreInfo.size`. In `makeRoom()` function, `ApplicationStoreInfo.size` is used to [...]
**Reproduce**: we can reproduce this issue in dev environment by reducing config value of "spark.history.retainedApplications" and "spark.history.store.maxDiskUsage" to some small values. Here are steps: 1. start history server, load some applications and access some pages (maybe "stages" page to trigger leveldb compaction). 2. restart HS, and refresh pages.
I also added an UT to simulate this case in `HistoryServerDiskManagerSuite`.
**Benefit**: this change would help improve history server reliability.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Add unit test and manually tested it.
Closes #28859 from zhli1142015/update-ApplicationStoreInfo.size-during-disk-manager-initialize.
Authored-by: Zhen Li <zh...@microsoft.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
(cherry picked from commit 8e7fc04637bbb8d2fdc2c758746e0eaf496c4d92)
Signed-off-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
---
.../deploy/history/HistoryServerDiskManager.scala | 21 ++++++++--
.../history/HistoryServerDiskManagerSuite.scala | 46 ++++++++++++++++++++++
2 files changed, 64 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
index 0a1f333..4ddd6d9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerDiskManager.scala
@@ -75,14 +75,29 @@ private class HistoryServerDiskManager(
// Go through the recorded store directories and remove any that may have been removed by
// external code.
- val orphans = listing.view(classOf[ApplicationStoreInfo]).asScala.filter { info =>
- !new File(info.path).exists()
- }.toSeq
+ val (existences, orphans) = listing
+ .view(classOf[ApplicationStoreInfo])
+ .asScala
+ .toSeq
+ .partition { info =>
+ new File(info.path).exists()
+ }
orphans.foreach { info =>
listing.delete(info.getClass(), info.path)
}
+ // Reading level db would trigger table file compaction, then it may cause size of level db
+ // directory changed. When service restarts, "currentUsage" is calculated from real directory
+ // size. Update "ApplicationStoreInfo.size" to ensure "currentUsage" equals
+ // sum of "ApplicationStoreInfo.size".
+ existences.foreach { info =>
+ val fileSize = sizeOf(new File(info.path))
+ if (fileSize != info.size) {
+ listing.write(info.copy(size = fileSize))
+ }
+ }
+
logInfo("Initialized disk manager: " +
s"current usage = ${Utils.bytesToString(currentUsage.get())}, " +
s"max usage = ${Utils.bytesToString(maxUsage)}")
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
index f78469e..b17880a 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerDiskManagerSuite.scala
@@ -158,4 +158,50 @@ class HistoryServerDiskManagerSuite extends SparkFunSuite with BeforeAndAfter {
assert(manager.approximateSize(50L, true) > 50L)
}
+ test("SPARK-32024: update ApplicationStoreInfo.size during initializing") {
+ val manager = mockManager()
+ val leaseA = manager.lease(2)
+ doReturn(3L).when(manager).sizeOf(meq(leaseA.tmpPath))
+ val dstA = leaseA.commit("app1", None)
+ assert(manager.free() === 0)
+ assert(manager.committed() === 3)
+ // Listing store tracks dstA now.
+ assert(store.read(classOf[ApplicationStoreInfo], dstA.getAbsolutePath).size === 3)
+
+ // Simulate: service restarts, new disk manager (manager1) is initialized.
+ val manager1 = mockManager()
+ // Simulate: event KVstore compaction before restart, directory size reduces.
+ doReturn(2L).when(manager1).sizeOf(meq(dstA))
+ doReturn(2L).when(manager1).sizeOf(meq(new File(testDir, "apps")))
+ manager1.initialize()
+ // "ApplicationStoreInfo.size" is updated for dstA.
+ assert(store.read(classOf[ApplicationStoreInfo], dstA.getAbsolutePath).size === 2)
+ assert(manager1.free() === 1)
+ // If "ApplicationStoreInfo.size" is not correctly updated, "IllegalStateException"
+ // would be thrown.
+ val leaseB = manager1.lease(2)
+ assert(manager1.free() === 1)
+ doReturn(2L).when(manager1).sizeOf(meq(leaseB.tmpPath))
+ val dstB = leaseB.commit("app2", None)
+ assert(manager1.committed() === 2)
+ // Listing store tracks dstB only, dstA is evicted by "makeRoom()".
+ assert(store.read(classOf[ApplicationStoreInfo], dstB.getAbsolutePath).size === 2)
+
+ val manager2 = mockManager()
+ // Simulate: cache entities are written after replaying, directory size increases.
+ doReturn(3L).when(manager2).sizeOf(meq(dstB))
+ doReturn(3L).when(manager2).sizeOf(meq(new File(testDir, "apps")))
+ manager2.initialize()
+ // "ApplicationStoreInfo.size" is updated for dstB.
+ assert(store.read(classOf[ApplicationStoreInfo], dstB.getAbsolutePath).size === 3)
+ assert(manager2.free() === 0)
+ val leaseC = manager2.lease(2)
+ doReturn(2L).when(manager2).sizeOf(meq(leaseC.tmpPath))
+ val dstC = leaseC.commit("app3", None)
+ assert(manager2.free() === 1)
+ assert(manager2.committed() === 2)
+ // Listing store tracks dstC only, dstB is evicted by "makeRoom()".
+ assert(store.read(classOf[ApplicationStoreInfo], dstC.getAbsolutePath).size === 2)
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org