You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/10/21 21:44:00 UTC
spark git commit: [SPARK-17929][CORE] Fix deadlock when
CoarseGrainedSchedulerBackend reset
Repository: spark
Updated Branches:
refs/heads/master 7a531e305 -> c1f344f1a
[SPARK-17929][CORE] Fix deadlock when CoarseGrainedSchedulerBackend reset
## What changes were proposed in this pull request?
https://issues.apache.org/jira/browse/SPARK-17929
Now `CoarseGrainedSchedulerBackend` reset will get the lock,
```
protected def reset(): Unit = synchronized {
numPendingExecutors = 0
executorsPendingToRemove.clear()
// Remove all the lingering executors that should be removed but not yet. The reason might be
// because (1) disconnected event is not yet received; (2) executors die silently.
executorDataMap.toMap.foreach { case (eid, _) =>
driverEndpoint.askWithRetry[Boolean](
RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
}
}
```
but on removeExecutor also need the lock "CoarseGrainedSchedulerBackend.this.synchronized", this will cause deadlock.
```
private def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
logDebug(s"Asked to remove executor $executorId with reason $reason")
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
// This must be synchronized because variables mutated
// in this block are read when requesting executors
val killed = CoarseGrainedSchedulerBackend.this.synchronized {
addressToExecutorId -= executorInfo.executorAddress
executorDataMap -= executorId
executorsPendingLossReason -= executorId
executorsPendingToRemove.remove(executorId).getOrElse(false)
}
...
## How was this patch tested?
manual test.
Author: w00228970 <wa...@huawei.com>
Closes #15481 from scwf/spark-17929.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1f344f1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1f344f1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1f344f1
Branch: refs/heads/master
Commit: c1f344f1a09b8834bec70c1ece30b9bff63e55ea
Parents: 7a531e3
Author: w00228970 <wa...@huawei.com>
Authored: Fri Oct 21 14:43:55 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Fri Oct 21 14:43:55 2016 -0700
----------------------------------------------------------------------
.../cluster/CoarseGrainedSchedulerBackend.scala | 14 ++++++++------
1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c1f344f1/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 0dae0e6..10d55c8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -386,15 +386,17 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
* Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only
* be called in the yarn-client mode when AM re-registers after a failure.
* */
- protected def reset(): Unit = synchronized {
- numPendingExecutors = 0
- executorsPendingToRemove.clear()
+ protected def reset(): Unit = {
+ val executors = synchronized {
+ numPendingExecutors = 0
+ executorsPendingToRemove.clear()
+ Set() ++ executorDataMap.keys
+ }
// Remove all the lingering executors that should be removed but not yet. The reason might be
// because (1) disconnected event is not yet received; (2) executors die silently.
- executorDataMap.toMap.foreach { case (eid, _) =>
- driverEndpoint.askWithRetry[Boolean](
- RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
+ executors.foreach { eid =>
+ removeExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered."))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org