You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/02/03 06:42:20 UTC
spark git commit: [SPARK-5219][Core] Add locks to avoid scheduling
race conditions
Repository: spark
Updated Branches:
refs/heads/master 60f67e7a1 -> c306555f4
[SPARK-5219][Core] Add locks to avoid scheduling race conditions
Author: zsxwing <zs...@gmail.com>
Closes #4019 from zsxwing/SPARK-5219 and squashes the following commits:
36a8b4e [zsxwing] Add locks to avoid race conditions
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c306555f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c306555f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c306555f
Branch: refs/heads/master
Commit: c306555f491e45ef870f58938af397f9ec5f166a
Parents: 60f67e7
Author: zsxwing <zs...@gmail.com>
Authored: Mon Feb 2 21:42:18 2015 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Feb 2 21:42:18 2015 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +-
.../main/scala/org/apache/spark/scheduler/TaskSetManager.scala | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c306555f/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 33a7aae..79f84e7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -361,7 +361,7 @@ private[spark] class TaskSchedulerImpl(
dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)
}
- def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {
+ def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized {
taskSetManager.handleTaskGettingResult(tid)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/c306555f/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 5c94c6b..97c22fe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -542,7 +542,7 @@ private[spark] class TaskSetManager(
/**
* Check whether has enough quota to fetch the result with `size` bytes
*/
- def canFetchMoreResults(size: Long): Boolean = synchronized {
+ def canFetchMoreResults(size: Long): Boolean = sched.synchronized {
totalResultSize += size
calculatedTasks += 1
if (maxResultSize > 0 && totalResultSize > maxResultSize) {
@@ -671,7 +671,7 @@ private[spark] class TaskSetManager(
maybeFinishTaskSet()
}
- def abort(message: String) {
+ def abort(message: String): Unit = sched.synchronized {
// TODO: Kill running tasks if we were not terminated due to a Mesos error
sched.dagScheduler.taskSetFailed(taskSet, message)
isZombie = true
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org