You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/02/03 06:42:20 UTC

spark git commit: [SPARK-5219][Core] Add locks to avoid scheduling race conditions

Repository: spark
Updated Branches:
  refs/heads/master 60f67e7a1 -> c306555f4


[SPARK-5219][Core] Add locks to avoid scheduling race conditions

Author: zsxwing <zs...@gmail.com>

Closes #4019 from zsxwing/SPARK-5219 and squashes the following commits:

36a8b4e [zsxwing] Add locks to avoid race conditions


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c306555f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c306555f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c306555f

Branch: refs/heads/master
Commit: c306555f491e45ef870f58938af397f9ec5f166a
Parents: 60f67e7
Author: zsxwing <zs...@gmail.com>
Authored: Mon Feb 2 21:42:18 2015 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Feb 2 21:42:18 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala     | 2 +-
 .../main/scala/org/apache/spark/scheduler/TaskSetManager.scala   | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c306555f/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 33a7aae..79f84e7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -361,7 +361,7 @@ private[spark] class TaskSchedulerImpl(
     dagScheduler.executorHeartbeatReceived(execId, metricsWithStageIds, blockManagerId)
   }
 
-  def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long) {
+  def handleTaskGettingResult(taskSetManager: TaskSetManager, tid: Long): Unit = synchronized {
     taskSetManager.handleTaskGettingResult(tid)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c306555f/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 5c94c6b..97c22fe 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -542,7 +542,7 @@ private[spark] class TaskSetManager(
   /**
    * Check whether has enough quota to fetch the result with `size` bytes
    */
-  def canFetchMoreResults(size: Long): Boolean = synchronized {
+  def canFetchMoreResults(size: Long): Boolean = sched.synchronized {
     totalResultSize += size
     calculatedTasks += 1
     if (maxResultSize > 0 && totalResultSize > maxResultSize) {
@@ -671,7 +671,7 @@ private[spark] class TaskSetManager(
     maybeFinishTaskSet()
   }
 
-  def abort(message: String) {
+  def abort(message: String): Unit = sched.synchronized {
     // TODO: Kill running tasks if we were not terminated due to a Mesos error
     sched.dagScheduler.taskSetFailed(taskSet, message)
     isZombie = true


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org