You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/07/23 22:46:52 UTC

[spark] branch branch-3.2 updated: [SPARK-39847][SS] Fix race condition in RocksDBLoader.loadLibrary() if caller thread is interrupted

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new f80559fc460 [SPARK-39847][SS] Fix race condition in RocksDBLoader.loadLibrary() if caller thread is interrupted
f80559fc460 is described below

commit f80559fc4605b3f3d9e8d7631a9a5c9e1439efe6
Author: Josh Rosen <jo...@databricks.com>
AuthorDate: Sat Jul 23 15:45:58 2022 -0700

    [SPARK-39847][SS] Fix race condition in RocksDBLoader.loadLibrary() if caller thread is interrupted
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a race condition in `RocksDBLoader.loadLibrary()`, which can occur if the thread which calls that method is interrupted.
    
    One of our jobs experienced a failure in `RocksDBLoader`:
    
    ```
    Caused by: java.lang.IllegalThreadStateException
            at java.lang.Thread.start(Thread.java:708)
            at org.apache.spark.sql.execution.streaming.state.RocksDBLoader$.loadLibrary(RocksDBLoader.scala:51)
    ```
    
    After investigation, we determined that this was due to task cancellation/interruption: if the task which starts the RocksDB library loading is interrupted, another thread may begin a load and crash with the thread state exception:
    
    - Although the `loadLibraryThread` child thread is is uninterruptible, the task thread which calls loadLibrary is still interruptible.
    - Let's say we have two tasks, A and B, both of which will call `RocksDBLoader.loadLibrary()`
    - Say that Task A wins the race to perform the load and enters the `synchronized` block in `loadLibrary()`, starts the `loadLibraryThread`, then blocks in the `loadLibraryThread.join()` call.
    - If Task A is interrupted, an `InterruptedException` will be thrown and it will exit the loadLibrary synchronized block.
    - At this point, Task B enters the synchronized block of its `loadLibrary() call and sees that `exception == null` because the `loadLibraryThread` started by the other task is still running, so Task B calls `loadLibraryThread.start()` and hits the thread state error because it tries to start an already-started thread.
    
    This PR fixes this issue by adding code to check `loadLibraryThread`'s state before calling `start()`: if the thread has already been started then we will skip the `start()` call and proceed directly to the `join()`. I also modified the logging so that we can detect when this case occurs.
    
    ### Why are the changes needed?
    
    Fix a bug that can lead to task or job failures.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    I reproduced the original race condition by adding a `Thread.sleep(10000)` to `loadLibraryThread.run()` (so it wouldn't complete instantly), then ran
    
    ```scala
      test("multi-threaded RocksDBLoader calls with interruption") {
    
        val taskThread = new Thread("interruptible Task Thread 1") {
          override def run(): Unit = {
            RocksDBLoader.loadLibrary()
          }
        }
    
        taskThread.start()
        // Give the thread time to enter the `loadLibrary()` call:
        Thread.sleep(1000)
        taskThread.interrupt()
        // Check that the load hasn't finished:
        assert(RocksDBLoader.exception == null)
        assert(RocksDBLoader.loadLibraryThread.getState != Thread.State.NEW)
        // Simulate the second task thread starting the load:
        RocksDBLoader.loadLibrary()
        // The load should finish successfully:
        RocksDBLoader.exception.isEmpty
      }
    ```
    
    This test failed prior to my changes and succeeds afterwards.
    
    I don't want to actually commit this test because I'm concerned about flakiness and false-negatives: in order to ensure that the test would have failed before my change, we need to carefully control the thread interleaving. This code rarely changes and is relatively simple, so I think the ROI on spending time to write and commit a reliable test is low.
    
    Closes #37260 from JoshRosen/rocksdbloader-fix.
    
    Authored-by: Josh Rosen <jo...@databricks.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 9cee1bb2527a496943ffedbd935dc737246a2d89)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../spark/sql/execution/streaming/state/RocksDBLoader.scala   | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBLoader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBLoader.scala
index cc518192437..02c98c14f86 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBLoader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBLoader.scala
@@ -48,7 +48,16 @@ object RocksDBLoader extends Logging {
 
   def loadLibrary(): Unit = synchronized {
     if (exception == null) {
-      loadLibraryThread.start()
+      // SPARK-39847: if a task thread is interrupted while blocking in this loadLibrary()
+      // call then a second task thread might start a loadLibrary() call while the first
+      // call's loadLibraryThread is still running. Checking loadLibraryThread's state here
+      // ensures that the second loadLibrary() call will wait for the original call's
+      // loadLibraryThread to complete. If we didn't have this call then the second
+      // loadLibraryCall() would call start() on an already-started thread, causing a
+      // java.lang.IllegalThreadStateException error.
+      if (loadLibraryThread.getState == Thread.State.NEW) {
+        loadLibraryThread.start()
+      }
       logInfo("RocksDB library loading thread started")
       loadLibraryThread.join()
       exception.foreach(throw _)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org