You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2014/07/16 19:24:25 UTC

git commit: SPARK-2277: make TaskScheduler track hosts on rack

Repository: spark
Updated Branches:
  refs/heads/master efc452a16 -> 33e64ecac


SPARK-2277: make TaskScheduler track hosts on rack

Hi mateiz, I've created [SPARK-2277](https://issues.apache.org/jira/browse/SPARK-2277) to make TaskScheduler track hosts on each rack. Please help to review, thanks.

Author: Rui Li <ru...@intel.com>

Closes #1212 from lirui-intel/trackHostOnRack and squashes the following commits:

2b4bd0f [Rui Li] SPARK-2277: refine UT
fbde838 [Rui Li] SPARK-2277: add UT
7bbe658 [Rui Li] SPARK-2277: rename the method
5e4ef62 [Rui Li] SPARK-2277: remove unnecessary import
79ac750 [Rui Li] SPARK-2277: make TaskScheduler track hosts on rack


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33e64eca
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33e64eca
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33e64eca

Branch: refs/heads/master
Commit: 33e64ecacbc44567f9cba2644a30a118653ea5fa
Parents: efc452a
Author: Rui Li <ru...@intel.com>
Authored: Wed Jul 16 22:53:37 2014 +0530
Committer: mridulm <mr...@apache.org>
Committed: Wed Jul 16 22:53:37 2014 +0530

----------------------------------------------------------------------
 .../spark/scheduler/TaskSchedulerImpl.scala     | 15 +++++
 .../apache/spark/scheduler/TaskSetManager.scala | 10 +++-
 .../spark/scheduler/TaskSetManagerSuite.scala   | 63 +++++++++++++++++++-
 3 files changed, 83 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/33e64eca/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 4b6d6da..be3673c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -88,6 +88,8 @@ private[spark] class TaskSchedulerImpl(
   // in turn is used to decide when we can attain data locality on a given host
   private val executorsByHost = new HashMap[String, HashSet[String]]
 
+  protected val hostsByRack = new HashMap[String, HashSet[String]]
+
   private val executorIdToHost = new HashMap[String, String]
 
   // Listener object to pass upcalls into
@@ -223,6 +225,9 @@ private[spark] class TaskSchedulerImpl(
         executorAdded(o.executorId, o.host)
         newExecAvail = true
       }
+      for (rack <- getRackForHost(o.host)) {
+        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
+      }
     }
 
     // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
@@ -418,6 +423,12 @@ private[spark] class TaskSchedulerImpl(
     execs -= executorId
     if (execs.isEmpty) {
       executorsByHost -= host
+      for (rack <- getRackForHost(host); hosts <- hostsByRack.get(rack)) {
+        hosts -= host
+        if (hosts.isEmpty) {
+          hostsByRack -= rack
+        }
+      }
     }
     executorIdToHost -= executorId
     rootPool.executorLost(executorId, host)
@@ -435,6 +446,10 @@ private[spark] class TaskSchedulerImpl(
     executorsByHost.contains(host)
   }
 
+  def hasHostAliveOnRack(rack: String): Boolean = synchronized {
+    hostsByRack.contains(rack)
+  }
+
   def isExecutorAlive(execId: String): Boolean = synchronized {
     activeExecutorIds.contains(execId)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/33e64eca/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 059cc90..3bdc71d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -191,7 +191,9 @@ private[spark] class TaskSetManager(
       addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
       for (rack <- sched.getRackForHost(loc.host)) {
         addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
-        hadAliveLocations = true
+        if(sched.hasHostAliveOnRack(rack)){
+          hadAliveLocations = true
+        }
       }
     }
 
@@ -748,7 +750,8 @@ private[spark] class TaskSetManager(
         pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
       levels += NODE_LOCAL
     }
-    if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) {
+    if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0 &&
+        pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
       levels += RACK_LOCAL
     }
     levels += ANY
@@ -761,7 +764,8 @@ private[spark] class TaskSetManager(
     def newLocAvail(index: Int): Boolean = {
       for (loc <- tasks(index).preferredLocations) {
         if (sched.hasExecutorsAliveOnHost(loc.host) ||
-            sched.getRackForHost(loc.host).isDefined) {
+           (sched.getRackForHost(loc.host).isDefined &&
+           sched.hasHostAliveOnRack(sched.getRackForHost(loc.host).get))) {
           return true
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/33e64eca/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 9ff2a48..86b443b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -54,6 +54,23 @@ class FakeDAGScheduler(sc: SparkContext, taskScheduler: FakeTaskScheduler)
   }
 }
 
+// Get the rack for a given host
+object FakeRackUtil {
+  private val hostToRack = new mutable.HashMap[String, String]()
+
+  def cleanUp() {
+    hostToRack.clear()
+  }
+
+  def assignHostToRack(host: String, rack: String) {
+    hostToRack(host) = rack
+  }
+
+  def getRackForHost(host: String) = {
+    hostToRack.get(host)
+  }
+}
+
 /**
  * A mock TaskSchedulerImpl implementation that just remembers information about tasks started and
  * feedback received from the TaskSetManagers. Note that it's important to initialize this with
@@ -69,6 +86,9 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
   val taskSetsFailed = new ArrayBuffer[String]
 
   val executors = new mutable.HashMap[String, String] ++ liveExecutors
+  for ((execId, host) <- liveExecutors; rack <- getRackForHost(host)) {
+    hostsByRack.getOrElseUpdate(rack, new mutable.HashSet[String]()) += host
+  }
 
   dagScheduler = new FakeDAGScheduler(sc, this)
 
@@ -82,7 +102,12 @@ class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* ex
 
   def addExecutor(execId: String, host: String) {
     executors.put(execId, host)
+    for (rack <- getRackForHost(host)) {
+      hostsByRack.getOrElseUpdate(rack, new mutable.HashSet[String]()) += host
+    }
   }
+
+  override def getRackForHost(value: String): Option[String] = FakeRackUtil.getRackForHost(value)
 }
 
 /**
@@ -419,6 +444,9 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
   }
 
   test("new executors get added") {
+    // Assign host2 to rack2
+    FakeRackUtil.cleanUp()
+    FakeRackUtil.assignHostToRack("host2", "rack2")
     sc = new SparkContext("local", "test")
     val sched = new FakeTaskScheduler(sc)
     val taskSet = FakeTask.createTaskSet(4,
@@ -444,8 +472,39 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
     manager.executorAdded()
     // No-pref list now only contains task 3
     assert(manager.pendingTasksWithNoPrefs.size === 1)
-    // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL and ANY
-    assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, ANY)))
+    // Valid locality should contain PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL and ANY
+    assert(manager.myLocalityLevels.sameElements(
+      Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
+  }
+
+  test("test RACK_LOCAL tasks") {
+    FakeRackUtil.cleanUp()
+    // Assign host1 to rack1
+    FakeRackUtil.assignHostToRack("host1", "rack1")
+    // Assign host2 to rack1
+    FakeRackUtil.assignHostToRack("host2", "rack1")
+    // Assign host3 to rack2
+    FakeRackUtil.assignHostToRack("host3", "rack2")
+    sc = new SparkContext("local", "test")
+    val sched = new FakeTaskScheduler(sc,
+      ("execA", "host1"), ("execB", "host2"), ("execC", "host3"))
+    val taskSet = FakeTask.createTaskSet(2,
+      Seq(TaskLocation("host1", "execA")),
+      Seq(TaskLocation("host1", "execA")))
+    val clock = new FakeClock
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
+
+    assert(manager.myLocalityLevels.sameElements(Array(PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY)))
+    // Set allowed locality to ANY
+    clock.advance(LOCALITY_WAIT * 3)
+    // Offer host3
+    // No task is scheduled if we restrict locality to RACK_LOCAL
+    assert(manager.resourceOffer("execC", "host3", RACK_LOCAL) === None)
+    // Task 0 can be scheduled with ANY
+    assert(manager.resourceOffer("execC", "host3", ANY).get.index === 0)
+    // Offer host2
+    // Task 1 can be scheduled with RACK_LOCAL
+    assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1)
   }
 
   test("do not emit warning when serialized task is small") {