You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2022/11/30 03:53:22 UTC

[spark] branch branch-3.3 updated: [SPARK-40987][CORE] `BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 3f7ff3507c3 [SPARK-40987][CORE] `BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully
3f7ff3507c3 is described below

commit 3f7ff3507c3964f965202e1fcb0048ac0a4af8e5
Author: sychen <sy...@ctrip.com>
AuthorDate: Tue Nov 29 21:52:43 2022 -0600

    [SPARK-40987][CORE] `BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully
    
    ### What changes were proposed in this pull request?
    `BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully.
    `removeBlockInternal` tries to call `removeBlock` in the finally block.
    
    ### Why are the changes needed?
    When the driver submits a job, `DAGScheduler` calls `sc.broadcast(taskBinaryBytes)`.
    `TorrentBroadcast#writeBlocks` may fail due to disk problems during `blockManager#putBytes`.
    `BlockManager#doPut` calls `BlockManager#removeBlockInternal` to clean up the block.
    `BlockManager#removeBlockInternal` calls `DiskStore#remove` to clean up blocks on disk.
    `DiskStore#remove` will try to create the directory because the directory does not exist, and an exception will be thrown at this time.
    `BlockInfoManager#blockInfoWrappers` block info and lock not removed.
    The catch block in `TorrentBroadcast#writeBlocks` will call `blockManager.removeBroadcast` to clean up the broadcast.
    Because the block lock in `BlockInfoManager#blockInfoWrappers` is not released, the `dag-scheduler-event-loop` thread of `DAGScheduler` will wait forever.
    
    ```
    22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: XXXXX.
    22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast
    ```
    
    ```
    "dag-scheduler-event-loop" #54 daemon prio=5 os_prio=31 tid=0x00007fc98e3fa800 nid=0x7203 waiting on condition [0x0000700008c1e000]
       java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x00000007add3d8c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
        at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221)
        at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214)
        at org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown Source)
        at org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105)
        at org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214)
        at org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293)
        at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979)
        at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970)
        at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970)
        at org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown Source)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970)
        at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179)
        at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
        at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
        at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
        at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
        at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
        at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
        at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2910)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Throw an exception before `Files.createDirectory` to simulate disk problems.
    
    DiskBlockManager#getFile
    ```java
    if (filename.contains("piece")) {
      throw new java.io.IOException("disk issue")
    }
    Files.createDirectory(path)
    ```
    
    ```
    ./bin/spark-shell
    ```
    ```scala
    spark.sql("select 1").collect()
    ```
    
    ```
    22/11/24 19:29:58 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: disk issue.
    22/11/24 19:29:58 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast
    org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.io.IOException: disk issue
    java.io.IOException: disk issue
            at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:109)
            at org.apache.spark.storage.DiskBlockManager.containsBlock(DiskBlockManager.scala:160)
            at org.apache.spark.storage.DiskStore.contains(DiskStore.scala:153)
            at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:879)
            at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1998)
            at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1484)
            at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:378)
            at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:1419)
            at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1(TorrentBroadcast.scala:170)
            at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1$adapted(TorrentBroadcast.scala:164)
            at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
            at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
            at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
            at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:164)
            at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)
            at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)
            at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)
            at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)
            at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)
            at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)
            at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)
            at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)
    ```
    
    Closes #38467 from cxzl25/SPARK-40987.
    
    Authored-by: sychen <sy...@ctrip.com>
    Signed-off-by: Mridul <mridul<at>gmail.com>
    (cherry picked from commit bbab0afb9a6919694cda5b9d490203af93a23460)
    Signed-off-by: Mridul <mridulatgmail.com>
---
 .../org/apache/spark/storage/BlockManager.scala    | 43 +++++++++++++---------
 1 file changed, 26 insertions(+), 17 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 53d2d054121..0ea72ec17ad 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1979,23 +1979,32 @@ private[spark] class BlockManager(
    * lock on the block.
    */
   private def removeBlockInternal(blockId: BlockId, tellMaster: Boolean): Unit = {
-    val blockStatus = if (tellMaster) {
-      val blockInfo = blockInfoManager.assertBlockIsLockedForWriting(blockId)
-      Some(getCurrentBlockStatus(blockId, blockInfo))
-    } else None
-
-    // Removals are idempotent in disk store and memory store. At worst, we get a warning.
-    val removedFromMemory = memoryStore.remove(blockId)
-    val removedFromDisk = diskStore.remove(blockId)
-    if (!removedFromMemory && !removedFromDisk) {
-      logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
-    }
-
-    blockInfoManager.removeBlock(blockId)
-    if (tellMaster) {
-      // Only update storage level from the captured block status before deleting, so that
-      // memory size and disk size are being kept for calculating delta.
-      reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = StorageLevel.NONE))
+    var hasRemoveBlock = false
+    try {
+      val blockStatus = if (tellMaster) {
+        val blockInfo = blockInfoManager.assertBlockIsLockedForWriting(blockId)
+        Some(getCurrentBlockStatus(blockId, blockInfo))
+      } else None
+
+      // Removals are idempotent in disk store and memory store. At worst, we get a warning.
+      val removedFromMemory = memoryStore.remove(blockId)
+      val removedFromDisk = diskStore.remove(blockId)
+      if (!removedFromMemory && !removedFromDisk) {
+        logWarning(s"Block $blockId could not be removed as it was not found on disk or in memory")
+      }
+
+      blockInfoManager.removeBlock(blockId)
+      hasRemoveBlock = true
+      if (tellMaster) {
+        // Only update storage level from the captured block status before deleting, so that
+        // memory size and disk size are being kept for calculating delta.
+        reportBlockStatus(blockId, blockStatus.get.copy(storageLevel = StorageLevel.NONE))
+      }
+    } finally {
+      if (!hasRemoveBlock) {
+        logWarning(s"Block $blockId was not removed normally.")
+        blockInfoManager.removeBlock(blockId)
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org