You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/11/30 19:47:21 UTC
spark git commit: [SPARK-18640] Add synchronization to
TaskScheduler.runningTasksByExecutors
Repository: spark
Updated Branches:
refs/heads/master bc95ea0be -> c51c77259
[SPARK-18640] Add synchronization to TaskScheduler.runningTasksByExecutors
## What changes were proposed in this pull request?
The method `TaskSchedulerImpl.runningTasksByExecutors()` accesses the mutable `executorIdToRunningTaskIds` map without proper synchronization. In addition, as markhamstra pointed out in #15986, the signature's use of parentheses is a little odd given that this is a pure getter method.
This patch fixes both issues.
## How was this patch tested?
Covered by existing tests.
Author: Josh Rosen <jo...@databricks.com>
Closes #16073 from JoshRosen/runningTasksByExecutors-thread-safety.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c51c7725
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c51c7725
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c51c7725
Branch: refs/heads/master
Commit: c51c7725944d60738e2bac3e11f6aea74812905c
Parents: bc95ea0
Author: Josh Rosen <jo...@databricks.com>
Authored: Wed Nov 30 14:47:41 2016 -0500
Committer: Andrew Or <an...@gmail.com>
Committed: Wed Nov 30 14:47:41 2016 -0500
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/SparkStatusTracker.scala | 2 +-
.../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +-
.../org/apache/spark/scheduler/TaskSchedulerImplSuite.scala | 4 ++--
3 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c51c7725/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
index 52c4656..22a553e 100644
--- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
+++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
@@ -112,7 +112,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
*/
def getExecutorInfos: Array[SparkExecutorInfo] = {
val executorIdToRunningTasks: Map[String, Int] =
- sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors()
+ sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors
sc.getExecutorStorageStatus.map { status =>
val bmId = status.blockManagerId
http://git-wip-us.apache.org/repos/asf/spark/blob/c51c7725/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 67446da..b03cfe4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -96,7 +96,7 @@ private[spark] class TaskSchedulerImpl(
// IDs of the tasks running on each executor
private val executorIdToRunningTaskIds = new HashMap[String, HashSet[Long]]
- def runningTasksByExecutors(): Map[String, Int] = {
+ def runningTasksByExecutors: Map[String, Int] = synchronized {
executorIdToRunningTaskIds.toMap.mapValues(_.size)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c51c7725/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 59bea27..a0b6268 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -678,7 +678,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// Check that state associated with the lost task attempt is cleaned up:
assert(taskScheduler.taskIdToExecutorId.isEmpty)
assert(taskScheduler.taskIdToTaskSetManager.isEmpty)
- assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty)
+ assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty)
}
test("if a task finishes with TaskState.LOST its executor is marked as dead") {
@@ -709,7 +709,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
// Check that state associated with the lost task attempt is cleaned up:
assert(taskScheduler.taskIdToExecutorId.isEmpty)
assert(taskScheduler.taskIdToTaskSetManager.isEmpty)
- assert(taskScheduler.runningTasksByExecutors().get("executor0").isEmpty)
+ assert(taskScheduler.runningTasksByExecutors.get("executor0").isEmpty)
// Check that the executor has been marked as dead
assert(!taskScheduler.isExecutorAlive("executor0"))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org