You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/06/02 08:48:42 UTC

[spark] branch branch-3.3 updated: [SPARK-38675][CORE] Fix race during unlock in BlockInfoManager

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 4bbaf3777e9 [SPARK-38675][CORE] Fix race during unlock in BlockInfoManager
4bbaf3777e9 is described below

commit 4bbaf3777e9cd90151ec526a05dd67aab22da403
Author: Herman van Hovell <he...@databricks.com>
AuthorDate: Thu Jun 2 16:48:11 2022 +0800

    [SPARK-38675][CORE] Fix race during unlock in BlockInfoManager
    
    ### What changes were proposed in this pull request?
    This PR fixes a race in the `BlockInfoManager` between `unlock` and `releaseAllLocksForTask`, resulting in a negative reader count for a block (which trips an assert). This happens when the following events take place:
    
    1. [THREAD 1] calls `releaseAllLocksForTask`. This starts by collecting all the blocks to be unlocked for this task.
    2. [THREAD 2] calls `unlock` for a read lock for the same task (this means the block is also in the list collected in step 1). It then proceeds to unlock the block by decrementing the reader count.
    3. [THREAD 1] now starts to release the collected locks, it does this by decrementing the readers counts for blocks by the number of acquired read locks. The problem is that step 2 made the lock counts for blocks incorrect, and we decrement by one (or a few) too many. This triggers a negative reader count assert.
    
    We fix this by adding a check to `unlock` that makes sure we are not in the process of unlocking. We do this by checking if there is a multiset associated with the task that contains the read locks.
    
    ### Why are the changes needed?
    It is a bug. Not fixing this can cause negative reader counts for blocks, and this causes task failures.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added a regression test in BlockInfoManager suite.
    
    Closes #35991 from hvanhovell/SPARK-38675.
    
    Authored-by: Herman van Hovell <he...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 078b505d2f0a0a4958dec7da816a7d672820b637)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/storage/BlockInfoManager.scala    | 15 +++++++----
 .../spark/storage/BlockInfoManagerSuite.scala      | 31 ++++++++++++++++++++++
 2 files changed, 41 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
index 5392c20eefb..9eb1418fd16 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala
@@ -360,12 +360,17 @@ private[storage] class BlockInfoManager extends Logging {
         info.writerTask = BlockInfo.NO_WRITER
         writeLocksByTask.get(taskAttemptId).remove(blockId)
       } else {
-        assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
-        info.readerCount -= 1
+        // There can be a race between unlock and releaseAllLocksForTask which causes negative
+        // reader counts. We need to check if the readLocksByTask per tasks are present, if they
+        // are not then we know releaseAllLocksForTask has already cleaned up the read lock.
         val countsForTask = readLocksByTask.get(taskAttemptId)
-        val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
-        assert(newPinCountForTask >= 0,
-          s"Task $taskAttemptId release lock on block $blockId more times than it acquired it")
+        if (countsForTask != null) {
+          assert(info.readerCount > 0, s"Block $blockId is not locked for reading")
+          info.readerCount -= 1
+          val newPinCountForTask: Int = countsForTask.remove(blockId, 1) - 1
+          assert(newPinCountForTask >= 0,
+            s"Task $taskAttemptId release lock on block $blockId more times than it acquired it")
+        }
       }
       condition.signalAll()
     }
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index 8ffc6798526..887644a8264 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -360,4 +360,35 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
     blockInfoManager.releaseAllLocksForTask(0)
     assert(blockInfoManager.getNumberOfMapEntries === initialNumMapEntries - 1)
   }
+
+  test("SPARK-38675 - concurrent unlock and releaseAllLocksForTask calls should not fail") {
+    // Create block
+    val blockId = TestBlockId("block")
+    assert(blockInfoManager.lockNewBlockForWriting(blockId, newBlockInfo()))
+    blockInfoManager.unlock(blockId)
+
+    // Without the fix the block below fails in 50% of the time. By executing it
+    // 10 times we increase the chance of failing to ~99.9%.
+    (0 to 10).foreach { task =>
+      withTaskId(task) {
+        blockInfoManager.registerTask(task)
+
+        // Acquire read locks
+        (0 to 50).foreach { _ =>
+          assert(blockInfoManager.lockForReading(blockId).isDefined)
+        }
+
+        // Asynchronously release read locks.
+        val futures = (0 to 50).map { _ =>
+          Future(blockInfoManager.unlock(blockId, Option(0L)))
+        }
+
+        // Remove all lock and hopefully don't hit an assertion error
+        blockInfoManager.releaseAllLocksForTask(task)
+
+        // Wait until all futures complete for the next iteration
+        futures.foreach(ThreadUtils.awaitReady(_, 100.millis))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org