You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/07/23 22:46:52 UTC
[spark] branch branch-3.2 updated: [SPARK-39847][SS] Fix race condition in RocksDBLoader.loadLibrary() if caller thread is interrupted
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new f80559fc460 [SPARK-39847][SS] Fix race condition in RocksDBLoader.loadLibrary() if caller thread is interrupted
f80559fc460 is described below
commit f80559fc4605b3f3d9e8d7631a9a5c9e1439efe6
Author: Josh Rosen <jo...@databricks.com>
AuthorDate: Sat Jul 23 15:45:58 2022 -0700
[SPARK-39847][SS] Fix race condition in RocksDBLoader.loadLibrary() if caller thread is interrupted
### What changes were proposed in this pull request?
This PR fixes a race condition in `RocksDBLoader.loadLibrary()`, which can occur if the thread which calls that method is interrupted.
One of our jobs experienced a failure in `RocksDBLoader`:
```
Caused by: java.lang.IllegalThreadStateException
at java.lang.Thread.start(Thread.java:708)
at org.apache.spark.sql.execution.streaming.state.RocksDBLoader$.loadLibrary(RocksDBLoader.scala:51)
```
After investigation, we determined that this was due to task cancellation/interruption: if the task which starts the RocksDB library loading is interrupted, another thread may begin a load and crash with the thread state exception:
- Although the `loadLibraryThread` child thread is is uninterruptible, the task thread which calls loadLibrary is still interruptible.
- Let's say we have two tasks, A and B, both of which will call `RocksDBLoader.loadLibrary()`
- Say that Task A wins the race to perform the load and enters the `synchronized` block in `loadLibrary()`, starts the `loadLibraryThread`, then blocks in the `loadLibraryThread.join()` call.
- If Task A is interrupted, an `InterruptedException` will be thrown and it will exit the loadLibrary synchronized block.
- At this point, Task B enters the synchronized block of its `loadLibrary() call and sees that `exception == null` because the `loadLibraryThread` started by the other task is still running, so Task B calls `loadLibraryThread.start()` and hits the thread state error because it tries to start an already-started thread.
This PR fixes this issue by adding code to check `loadLibraryThread`'s state before calling `start()`: if the thread has already been started then we will skip the `start()` call and proceed directly to the `join()`. I also modified the logging so that we can detect when this case occurs.
### Why are the changes needed?
Fix a bug that can lead to task or job failures.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
I reproduced the original race condition by adding a `Thread.sleep(10000)` to `loadLibraryThread.run()` (so it wouldn't complete instantly), then ran
```scala
test("multi-threaded RocksDBLoader calls with interruption") {
val taskThread = new Thread("interruptible Task Thread 1") {
override def run(): Unit = {
RocksDBLoader.loadLibrary()
}
}
taskThread.start()
// Give the thread time to enter the `loadLibrary()` call:
Thread.sleep(1000)
taskThread.interrupt()
// Check that the load hasn't finished:
assert(RocksDBLoader.exception == null)
assert(RocksDBLoader.loadLibraryThread.getState != Thread.State.NEW)
// Simulate the second task thread starting the load:
RocksDBLoader.loadLibrary()
// The load should finish successfully:
RocksDBLoader.exception.isEmpty
}
```
This test failed prior to my changes and succeeds afterwards.
I don't want to actually commit this test because I'm concerned about flakiness and false-negatives: in order to ensure that the test would have failed before my change, we need to carefully control the thread interleaving. This code rarely changes and is relatively simple, so I think the ROI on spending time to write and commit a reliable test is low.
Closes #37260 from JoshRosen/rocksdbloader-fix.
Authored-by: Josh Rosen <jo...@databricks.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit 9cee1bb2527a496943ffedbd935dc737246a2d89)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../spark/sql/execution/streaming/state/RocksDBLoader.scala | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBLoader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBLoader.scala
index cc518192437..02c98c14f86 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBLoader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBLoader.scala
@@ -48,7 +48,16 @@ object RocksDBLoader extends Logging {
def loadLibrary(): Unit = synchronized {
if (exception == null) {
- loadLibraryThread.start()
+ // SPARK-39847: if a task thread is interrupted while blocking in this loadLibrary()
+ // call then a second task thread might start a loadLibrary() call while the first
+ // call's loadLibraryThread is still running. Checking loadLibraryThread's state here
+ // ensures that the second loadLibrary() call will wait for the original call's
+ // loadLibraryThread to complete. If we didn't have this call then the second
+ // loadLibraryCall() would call start() on an already-started thread, causing a
+ // java.lang.IllegalThreadStateException error.
+ if (loadLibraryThread.getState == Thread.State.NEW) {
+ loadLibraryThread.start()
+ }
logInfo("RocksDB library loading thread started")
loadLibraryThread.join()
exception.foreach(throw _)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org