You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ad...@apache.org on 2014/05/14 09:54:45 UTC

git commit: [SPARK-1769] Executor loss causes NPE race condition

Repository: spark
Updated Branches:
  refs/heads/master 54ae8328b -> 69f750228


[SPARK-1769] Executor loss causes NPE race condition

This PR replaces the Schedulable data structures in Pool.scala with thread-safe ones from java. Note that Scala's `with SynchronizedBuffer` trait is soon to be deprecated in 2.11 because it is ["inherently unreliable"](http://www.scala-lang.org/api/2.11.0/index.html#scala.collection.mutable.SynchronizedBuffer). We should slowly drift away from `SynchronizedBuffer` in other places too.

Note that this PR introduces an API-breaking change; `sc.getAllPools` now returns an Array rather than an ArrayBuffer. This is because we want this method to return an immutable copy rather than one may potentially confuse the user if they try to modify the copy, which takes no effect on the original data structure.

Author: Andrew Or <an...@gmail.com>

Closes #762 from andrewor14/pool-npe and squashes the following commits:

383e739 [Andrew Or] JavaConverters -> JavaConversions
3f32981 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pool-npe
769be19 [Andrew Or] Assorted minor changes
2189247 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pool-npe
05ad9e9 [Andrew Or] Fix test - contains is not the same as containsKey
0921ea0 [Andrew Or] var -> val
07d720c [Andrew Or] Synchronize Schedulable data structures


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/69f75022
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/69f75022
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/69f75022

Branch: refs/heads/master
Commit: 69f750228f3ec8537a93da08e712596fa8004143
Parents: 54ae832
Author: Andrew Or <an...@gmail.com>
Authored: Wed May 14 00:54:33 2014 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Wed May 14 00:54:33 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 20 ++++++++-----
 .../scala/org/apache/spark/scheduler/Pool.scala | 31 ++++++++++----------
 .../apache/spark/scheduler/Schedulable.scala    |  6 ++--
 .../spark/scheduler/TaskSchedulerImpl.scala     |  2 +-
 .../scheduler/TaskSchedulerImplSuite.scala      |  2 +-
 5 files changed, 35 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/69f75022/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index c43b4fd..032b3d7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -17,15 +17,17 @@
 
 package org.apache.spark
 
+import scala.language.implicitConversions
+
 import java.io._
 import java.net.URI
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.{Properties, UUID}
 import java.util.UUID.randomUUID
 import scala.collection.{Map, Set}
+import scala.collection.JavaConversions._
 import scala.collection.generic.Growable
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-import scala.language.implicitConversions
+import scala.collection.mutable.HashMap
 import scala.reflect.{ClassTag, classTag}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -836,18 +838,22 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 
   /**
-   *  Return pools for fair scheduler
-   *  TODO(xiajunluan): We should take nested pools into account
+   * :: DeveloperApi ::
+   * Return pools for fair scheduler
    */
-  def getAllPools: ArrayBuffer[Schedulable] = {
-    taskScheduler.rootPool.schedulableQueue
+  @DeveloperApi
+  def getAllPools: Seq[Schedulable] = {
+    // TODO(xiajunluan): We should take nested pools into account
+    taskScheduler.rootPool.schedulableQueue.toSeq
   }
 
   /**
+   * :: DeveloperApi ::
    * Return the pool associated with the given name, if one exists
    */
+  @DeveloperApi
   def getPoolForName(pool: String): Option[Schedulable] = {
-    taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)
+    Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/69f75022/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
index 187672c..174b732 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala
@@ -17,8 +17,10 @@
 
 package org.apache.spark.scheduler
 
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue}
+
+import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
 
 import org.apache.spark.Logging
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
@@ -35,18 +37,15 @@ private[spark] class Pool(
   extends Schedulable
   with Logging {
 
-  var schedulableQueue = new ArrayBuffer[Schedulable]
-  var schedulableNameToSchedulable = new HashMap[String, Schedulable]
-
+  val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
+  val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
   var weight = initWeight
   var minShare = initMinShare
   var runningTasks = 0
-
   var priority = 0
 
   // A pool's stage id is used to break the tie in scheduling.
   var stageId = -1
-
   var name = poolName
   var parent: Pool = null
 
@@ -60,19 +59,20 @@ private[spark] class Pool(
   }
 
   override def addSchedulable(schedulable: Schedulable) {
-    schedulableQueue += schedulable
-    schedulableNameToSchedulable(schedulable.name) = schedulable
+    require(schedulable != null)
+    schedulableQueue.add(schedulable)
+    schedulableNameToSchedulable.put(schedulable.name, schedulable)
     schedulable.parent = this
   }
 
   override def removeSchedulable(schedulable: Schedulable) {
-    schedulableQueue -= schedulable
-    schedulableNameToSchedulable -= schedulable.name
+    schedulableQueue.remove(schedulable)
+    schedulableNameToSchedulable.remove(schedulable.name)
   }
 
   override def getSchedulableByName(schedulableName: String): Schedulable = {
-    if (schedulableNameToSchedulable.contains(schedulableName)) {
-      return schedulableNameToSchedulable(schedulableName)
+    if (schedulableNameToSchedulable.containsKey(schedulableName)) {
+      return schedulableNameToSchedulable.get(schedulableName)
     }
     for (schedulable <- schedulableQueue) {
       val sched = schedulable.getSchedulableByName(schedulableName)
@@ -95,11 +95,12 @@ private[spark] class Pool(
     shouldRevive
   }
 
-  override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
+  override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
     var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
-    val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
+    val sortedSchedulableQueue =
+      schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
     for (schedulable <- sortedSchedulableQueue) {
-      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue()
+      sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
     }
     sortedTaskSetQueue
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/69f75022/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
index ed24eb6..a87ef03 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.scheduler
 
+import java.util.concurrent.ConcurrentLinkedQueue
+
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
@@ -28,7 +30,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 private[spark] trait Schedulable {
   var parent: Pool
   // child queues
-  def schedulableQueue: ArrayBuffer[Schedulable]
+  def schedulableQueue: ConcurrentLinkedQueue[Schedulable]
   def schedulingMode: SchedulingMode
   def weight: Int
   def minShare: Int
@@ -42,5 +44,5 @@ private[spark] trait Schedulable {
   def getSchedulableByName(name: String): Schedulable
   def executorLost(executorId: String, host: String): Unit
   def checkSpeculatableTasks(): Boolean
-  def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager]
+  def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/69f75022/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 5a68f38..ffd1d94 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -222,7 +222,7 @@ private[spark] class TaskSchedulerImpl(
     // Build a list of tasks to assign to each worker.
     val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
     val availableCpus = shuffledOffers.map(o => o.cores).toArray
-    val sortedTaskSets = rootPool.getSortedTaskSetQueue()
+    val sortedTaskSets = rootPool.getSortedTaskSetQueue
     for (taskSet <- sortedTaskSets) {
       logDebug("parentName: %s, name: %s, runningTasks: %s".format(
         taskSet.parent.name, taskSet.name, taskSet.runningTasks))

http://git-wip-us.apache.org/repos/asf/spark/blob/69f75022/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index a8b605c..7532da8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -117,7 +117,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
   }
 
   def resourceOffer(rootPool: Pool): Int = {
-    val taskSetQueue = rootPool.getSortedTaskSetQueue()
+    val taskSetQueue = rootPool.getSortedTaskSetQueue
     /* Just for Test*/
     for (manager <- taskSetQueue) {
        logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(