You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2019/03/18 15:34:03 UTC
[spark] branch master updated: [SPARK-27112][CORE] : Create a resource ordering between threads to resolve the deadlocks encountered …
This is an automated email from the ASF dual-hosted git repository.
irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 7043aee [SPARK-27112][CORE] : Create a resource ordering between threads to resolve the deadlocks encountered …
7043aee is described below
commit 7043aee1ba95e92e1cbd0ebafcc5b09b69ee3082
Author: pgandhi <pg...@verizonmedia.com>
AuthorDate: Mon Mar 18 10:33:51 2019 -0500
[SPARK-27112][CORE] : Create a resource ordering between threads to resolve the deadlocks encountered …
…when trying to kill executors either due to dynamic allocation or blacklisting
## What changes were proposed in this pull request?
There are two deadlocks as a result of the interplay between three different threads:
**task-result-getter thread**
**spark-dynamic-executor-allocation thread**
**dispatcher-event-loop thread(makeOffers())**
The fix ensures ordering synchronization constraint by acquiring lock on `TaskSchedulerImpl` before acquiring lock on `CoarseGrainedSchedulerBackend` in `makeOffers()` as well as killExecutors() method. This ensures resource ordering between the threads and thus, fixes the deadlocks.
## How was this patch tested?
Manual Tests
Closes #24072 from pgandhi999/SPARK-27112-2.
Authored-by: pgandhi <pg...@verizonmedia.com>
Signed-off-by: Imran Rashid <ir...@cloudera.com>
---
.../scheduler/cluster/CoarseGrainedSchedulerBackend.scala | 13 ++++++++++---
1 file changed, 10 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index dc0f21c..808ef08 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -258,7 +258,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on all executors
private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
- val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
+ val taskDescs = withLock {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
@@ -284,7 +284,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on just one executor
private def makeOffers(executorId: String) {
// Make sure no executor is killed while some task is launching on it
- val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
+ val taskDescs = withLock {
// Filter out executors under killing
if (executorIsAlive(executorId)) {
val executorData = executorDataMap(executorId)
@@ -631,7 +631,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
force: Boolean): Seq[String] = {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
- val response = synchronized {
+ val response = withLock {
val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
unknownExecutors.foreach { id =>
logWarning(s"Executor to kill $id does not exist!")
@@ -730,6 +730,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
protected def currentDelegationTokens: Array[Byte] = delegationTokens.get()
+ // SPARK-27112: We need to ensure that there is ordering of lock acquisition
+ // between TaskSchedulerImpl and CoarseGrainedSchedulerBackend objects in order to fix
+ // the deadlock issue exposed in SPARK-27112
+ private def withLock[T](fn: => T): T = scheduler.synchronized {
+ CoarseGrainedSchedulerBackend.this.synchronized { fn }
+ }
+
}
private[spark] object CoarseGrainedSchedulerBackend {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org