You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2017/02/10 00:05:59 UTC

spark git commit: [SPARK-19263] Fix race in SchedulerIntegrationSuite.

Repository: spark
Updated Branches:
  refs/heads/master 303f00a4b -> fd6c3a0b1


[SPARK-19263] Fix race in SchedulerIntegrationSuite.

## What changes were proposed in this pull request?

All the process of offering resource and generating `TaskDescription` should be guarded by taskScheduler.synchronized in `reviveOffers`, otherwise there is race condition.

## How was this patch tested?

Existing unit tests.

Author: jinxing <ji...@meituan.com>

Closes #16831 from jinxing64/SPARK-19263-FixRaceInTest.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fd6c3a0b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fd6c3a0b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fd6c3a0b

Branch: refs/heads/master
Commit: fd6c3a0b10ce43a56df845ba66d160b77f02e576
Parents: 303f00a
Author: jinxing <ji...@meituan.com>
Authored: Thu Feb 9 16:05:44 2017 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Thu Feb 9 16:05:44 2017 -0800

----------------------------------------------------------------------
 .../spark/scheduler/SchedulerIntegrationSuite.scala   | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fd6c3a0b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index 2ba63da..398ac3d 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -391,17 +391,17 @@ private[spark] abstract class MockBackend(
    * scheduling.
    */
   override def reviveOffers(): Unit = {
-    val newTaskDescriptions = taskScheduler.resourceOffers(generateOffers()).flatten
-    // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual
-    // tests from introducing a race if they need it
-    val newTasks = taskScheduler.synchronized {
-      newTaskDescriptions.map { taskDescription =>
+    // Need a lock on the entire scheduler to protect freeCores -- otherwise, multiple threads
+    // may make offers at the same time, though they are using the same set of freeCores.
+    taskScheduler.synchronized {
+      val newTaskDescriptions = taskScheduler.resourceOffers(generateOffers()).flatten
+      // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual
+      // tests from introducing a race if they need it.
+      val newTasks = newTaskDescriptions.map { taskDescription =>
         val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet
         val task = taskSet.tasks(taskDescription.index)
         (taskDescription, task)
       }
-    }
-    synchronized {
       newTasks.foreach { case (taskDescription, _) =>
         executorIdToExecutor(taskDescription.executorId).freeCores -= taskScheduler.CPUS_PER_TASK
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org