You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2022/11/30 03:52:55 UTC
[spark] branch master updated: [SPARK-40987][CORE] `BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully
This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new bbab0afb9a6 [SPARK-40987][CORE] `BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully
bbab0afb9a6 is described below
commit bbab0afb9a6919694cda5b9d490203af93a23460
Author: sychen <sy...@ctrip.com>
AuthorDate: Tue Nov 29 21:52:43 2022 -0600
[SPARK-40987][CORE] `BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully
### What changes were proposed in this pull request?
`BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully.
`removeBlockInternal` tries to call `removeBlock` in the finally block.
### Why are the changes needed?
When the driver submits a job, `DAGScheduler` calls `sc.broadcast(taskBinaryBytes)`.
`TorrentBroadcast#writeBlocks` may fail due to disk problems during `blockManager#putBytes`.
`BlockManager#doPut` calls `BlockManager#removeBlockInternal` to clean up the block.
`BlockManager#removeBlockInternal` calls `DiskStore#remove` to clean up blocks on disk.
`DiskStore#remove` will try to create the directory because the directory does not exist, and an exception will be thrown at this time.
`BlockInfoManager#blockInfoWrappers` block info and lock not removed.
The catch block in `TorrentBroadcast#writeBlocks` will call `blockManager.removeBroadcast` to clean up the broadcast.
Because the block lock in `BlockInfoManager#blockInfoWrappers` is not released, the `dag-scheduler-event-loop` thread of `DAGScheduler` will wait forever.
```
22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: XXXXX.
22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast
```
```
"dag-scheduler-event-loop" #54 daemon prio=5 os_prio=31 tid=0x00007fc98e3fa800 nid=0x7203 waiting on condition [0x0000700008c1e000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000007add3d8c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221)
at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214)
at org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown Source)
at org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105)
at org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214)
at org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293)
at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979)
at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970)
at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970)
at org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown Source)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2910)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Throw an exception before `Files.createDirectory` to simulate disk problems.
DiskBlockManager#getFile
```java
if (filename.contains("piece")) {
throw new java.io.IOException("disk issue")
}
Files.createDirectory(path)
```
```
./bin/spark-shell
```
```scala
spark.sql("select 1").collect()
```
```
22/11/24 19:29:58 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: disk issue.
22/11/24 19:29:58 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast
org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.io.IOException: disk issue
java.io.IOException: disk issue
at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:109)
at org.apache.spark.storage.DiskBlockManager.containsBlock(DiskBlockManager.scala:160)
at org.apache.spark.storage.DiskStore.contains(DiskStore.scala:153)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:879)
at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1998)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1484)
at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:378)
at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:1419)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1(TorrentBroadcast.scala:170)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1$adapted(TorrentBroadcast.scala:164)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:164)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
```
Closes #38467 from cxzl25/SPARK-40987.
Authored-by: sychen <sy...@ctrip.com>
Signed-off-by: Mridul <mridul<at>gmail.com>
---
.../org/apache/spark/storage/BlockManager.scala | 43 +++++++++++++---------
1 file changed, 26 insertions(+), 17 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 42a6cddc55f..d5fde96b146 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1991,23 +1991,32 @@ private[spark] class BlockManager(
* lock on the block.
*/
private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = {
- val blockStatus = if (tellMaster) {
- val blockInfo = blockInfoManager.assertBlockIsLockedForWriting(blockId)
- Some(getCurrentBlockStatus(blockId, blockInfo))
- } else None
-
- // Removals are idempotent in disk store and memory store. At worst, we get a warning.
- val removedFromMemory = memoryStore.remove(blockId)
- val removedFromDisk = diskStore.remove(blockId)
- if (!removedFromMemory && !removedFromDisk) {
- logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
- }
-
- blockInfoManager.removeBlock(blockId)
- if (tellMaster) {
- // Only update storage level from the captured block status before deleting, so that
- // memory size and disk size are being kept for calculating delta.
- reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = StorageLevel.NONE))
+ var hasRemoveBlock = false
+ try {
+ val blockStatus = if (tellMaster) {
+ val blockInfo = blockInfoManager.assertBlockIsLockedForWriting(blockId)
+ Some(getCurrentBlockStatus(blockId, blockInfo))
+ } else None
+
+ // Removals are idempotent in disk store and memory store. At worst, we get a warning.
+ val removedFromMemory = memoryStore.remove(blockId)
+ val removedFromDisk = diskStore.remove(blockId)
+ if (!removedFromMemory && !removedFromDisk) {
+ logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
+ }
+
+ blockInfoManager.removeBlock(blockId)
+ hasRemoveBlock = true
+ if (tellMaster) {
+ // Only update storage level from the captured block status before deleting, so that
+ // memory size and disk size are being kept for calculating delta.
+ reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = StorageLevel.NONE))
+ }
+ } finally {
+ if (!hasRemoveBlock) {
+ logWarning(s"Block $blockId was not removed normally.")
+ blockInfoManager.removeBlock(blockId)
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org