You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2015/06/11 00:07:12 UTC

spark git commit: [SPARK-2774] Set preferred locations for reduce tasks

Repository: spark
Updated Branches:
  refs/heads/master 5014d0ed7 -> 96a7c888d


[SPARK-2774] Set preferred locations for reduce tasks

Set preferred locations for reduce tasks.
The basic design is that we maintain a map from reducerId to a list of (sizes, locations) for each
shuffle. We then set the preferred locations to be any machines that have 20% of more of the output
that needs to be read by the reduce task.  This will result in at most 5 preferred locations for
each reduce task.

Selecting the preferred locations involves O(# map tasks * # reduce tasks) computation, so we
restrict this feature to cases where we have fewer than 1000 map tasks and 1000 reduce tasks.

Author: Shivaram Venkataraman <sh...@cs.berkeley.edu>

Closes #6652 from shivaram/reduce-locations and squashes the following commits:

492e25e [Shivaram Venkataraman] Remove unused import
2ef2d39 [Shivaram Venkataraman] Address code review comments
897a914 [Shivaram Venkataraman] Remove unused hash map
f5be578 [Shivaram Venkataraman] Use fraction of map outputs to determine locations Also removes caching of preferred locations to make the API cleaner
68bc29e [Shivaram Venkataraman] Fix line length
1090b58 [Shivaram Venkataraman] Change flag name
77ce7d8 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
e5d56bd [Shivaram Venkataraman] Add flag to turn off locality for shuffle deps
6cfae98 [Shivaram Venkataraman] Filter out zero blocks, rename variables
9d5831a [Shivaram Venkataraman] Address some more comments
8e31266 [Shivaram Venkataraman] Fix style
0df3180 [Shivaram Venkataraman] Address code review comments
e7d5449 [Shivaram Venkataraman] Fix merge issues
ad7cb53 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
df14cee [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
5093aea [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
0171d3c [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
bc4dfd6 [Shivaram Venkataraman] Merge branch 'master' of https://github.com/apache/spark into reduce-locations
774751b [Shivaram Venkataraman] Fix bug introduced by line length adjustment
34d0283 [Shivaram Venkataraman] Fix style issues
3b464b7 [Shivaram Venkataraman] Set preferred locations for reduce tasks This is another attempt at #1697 addressing some of the earlier concerns. This adds a couple of thresholds based on number map and reduce tasks beyond which we don't use preferred locations for reduce tasks.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/96a7c888
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/96a7c888
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/96a7c888

Branch: refs/heads/master
Commit: 96a7c888d806adfdb2c722025a1079ed7eaa2052
Parents: 5014d0e
Author: Shivaram Venkataraman <sh...@cs.berkeley.edu>
Authored: Wed Jun 10 15:03:40 2015 -0700
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Wed Jun 10 15:04:38 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/MapOutputTracker.scala     | 49 ++++++++++++-
 .../apache/spark/scheduler/DAGScheduler.scala   | 37 +++++++++-
 .../apache/spark/MapOutputTrackerSuite.scala    | 35 +++++++++
 .../spark/scheduler/DAGSchedulerSuite.scala     | 76 +++++++++++++++-----
 4 files changed, 177 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/96a7c888/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 0184228..862ffe8 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.util.concurrent.ConcurrentHashMap
 import java.util.zip.{GZIPInputStream, GZIPOutputStream}
 
-import scala.collection.mutable.{HashSet, Map}
+import scala.collection.mutable.{HashMap, HashSet, Map}
 import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
 
@@ -284,6 +284,53 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf)
     cachedSerializedStatuses.contains(shuffleId) || mapStatuses.contains(shuffleId)
   }
 
+  /**
+   * Return a list of locations that each have fraction of map output greater than the specified
+   * threshold.
+   *
+   * @param shuffleId id of the shuffle
+   * @param reducerId id of the reduce task
+   * @param numReducers total number of reducers in the shuffle
+   * @param fractionThreshold fraction of total map output size that a location must have
+   *                          for it to be considered large.
+   *
+   * This method is not thread-safe.
+   */
+  def getLocationsWithLargestOutputs(
+      shuffleId: Int,
+      reducerId: Int,
+      numReducers: Int,
+      fractionThreshold: Double)
+    : Option[Array[BlockManagerId]] = {
+
+    if (mapStatuses.contains(shuffleId)) {
+      val statuses = mapStatuses(shuffleId)
+      if (statuses.nonEmpty) {
+        // HashMap to add up sizes of all blocks at the same location
+        val locs = new HashMap[BlockManagerId, Long]
+        var totalOutputSize = 0L
+        var mapIdx = 0
+        while (mapIdx < statuses.length) {
+          val status = statuses(mapIdx)
+          val blockSize = status.getSizeForBlock(reducerId)
+          if (blockSize > 0) {
+            locs(status.location) = locs.getOrElse(status.location, 0L) + blockSize
+            totalOutputSize += blockSize
+          }
+          mapIdx = mapIdx + 1
+        }
+        val topLocs = locs.filter { case (loc, size) =>
+          size.toDouble / totalOutputSize >= fractionThreshold
+        }
+        // Return if we have any locations which satisfy the required threshold
+        if (topLocs.nonEmpty) {
+          return Some(topLocs.map(_._1).toArray)
+        }
+      }
+    }
+    None
+  }
+
   def incrementEpoch() {
     epochLock.synchronized {
       epoch += 1

http://git-wip-us.apache.org/repos/asf/spark/blob/96a7c888/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 75a567f..aea6674 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -137,6 +137,22 @@ class DAGScheduler(
   private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
   taskScheduler.setDAGScheduler(this)
 
+  // Flag to control if reduce tasks are assigned preferred locations
+  private val shuffleLocalityEnabled =
+    sc.getConf.getBoolean("spark.shuffle.reduceLocality.enabled", true)
+  // Number of map, reduce tasks above which we do not assign preferred locations
+  // based on map output sizes. We limit the size of jobs for which assign preferred locations
+  // as computing the top locations by size becomes expensive.
+  private[this] val SHUFFLE_PREF_MAP_THRESHOLD = 1000
+  // NOTE: This should be less than 2000 as we use HighlyCompressedMapStatus beyond that
+  private[this] val SHUFFLE_PREF_REDUCE_THRESHOLD = 1000
+
+  // Fraction of total map output that must be at a location for it to considered as a preferred
+  // location for a reduce task.
+  // Making this larger will focus on fewer locations where most data can be read locally, but
+  // may lead to more delay in scheduling if those locations are busy.
+  private[scheduler] val REDUCER_PREF_LOCS_FRACTION = 0.2
+
   // Called by TaskScheduler to report task's starting.
   def taskStarted(task: Task[_], taskInfo: TaskInfo) {
     eventProcessLoop.post(BeginEvent(task, taskInfo))
@@ -1384,17 +1400,32 @@ class DAGScheduler(
     if (rddPrefs.nonEmpty) {
       return rddPrefs.map(TaskLocation(_))
     }
-    // If the RDD has narrow dependencies, pick the first partition of the first narrow dep
-    // that has any placement preferences. Ideally we would choose based on transfer sizes,
-    // but this will do for now.
+
     rdd.dependencies.foreach {
       case n: NarrowDependency[_] =>
+        // If the RDD has narrow dependencies, pick the first partition of the first narrow dep
+        // that has any placement preferences. Ideally we would choose based on transfer sizes,
+        // but this will do for now.
         for (inPart <- n.getParents(partition)) {
           val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
           if (locs != Nil) {
             return locs
           }
         }
+      case s: ShuffleDependency[_, _, _] =>
+        // For shuffle dependencies, pick locations which have at least REDUCER_PREF_LOCS_FRACTION
+        // of data as preferred locations
+        if (shuffleLocalityEnabled &&
+            rdd.partitions.size < SHUFFLE_PREF_REDUCE_THRESHOLD &&
+            s.rdd.partitions.size < SHUFFLE_PREF_MAP_THRESHOLD) {
+          // Get the preferred map output locations for this reducer
+          val topLocsForReducer = mapOutputTracker.getLocationsWithLargestOutputs(s.shuffleId,
+            partition, rdd.partitions.size, REDUCER_PREF_LOCS_FRACTION)
+          if (topLocsForReducer.nonEmpty) {
+            return topLocsForReducer.get.map(loc => TaskLocation(loc.host, loc.executorId))
+          }
+        }
+
       case _ =>
     }
     Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/96a7c888/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
index 1fab696..7a19611 100644
--- a/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
@@ -205,4 +205,39 @@ class MapOutputTrackerSuite extends SparkFunSuite {
 //    masterTracker.stop() // this throws an exception
     rpcEnv.shutdown()
   }
+
+  test("getLocationsWithLargestOutputs with multiple outputs in same machine") {
+    val rpcEnv = createRpcEnv("test")
+    val tracker = new MapOutputTrackerMaster(conf)
+    tracker.trackerEndpoint = rpcEnv.setupEndpoint(MapOutputTracker.ENDPOINT_NAME,
+      new MapOutputTrackerMasterEndpoint(rpcEnv, tracker, conf))
+    // Setup 3 map tasks
+    // on hostA with output size 2
+    // on hostA with output size 2
+    // on hostB with output size 3
+    tracker.registerShuffle(10, 3)
+    tracker.registerMapOutput(10, 0, MapStatus(BlockManagerId("a", "hostA", 1000),
+        Array(2L)))
+    tracker.registerMapOutput(10, 1, MapStatus(BlockManagerId("a", "hostA", 1000),
+        Array(2L)))
+    tracker.registerMapOutput(10, 2, MapStatus(BlockManagerId("b", "hostB", 1000),
+        Array(3L)))
+
+    // When the threshold is 50%, only host A should be returned as a preferred location
+    // as it has 4 out of 7 bytes of output.
+    val topLocs50 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.5)
+    assert(topLocs50.nonEmpty)
+    assert(topLocs50.get.size === 1)
+    assert(topLocs50.get.head === BlockManagerId("a", "hostA", 1000))
+
+    // When the threshold is 20%, both hosts should be returned as preferred locations.
+    val topLocs20 = tracker.getLocationsWithLargestOutputs(10, 0, 1, 0.2)
+    assert(topLocs20.nonEmpty)
+    assert(topLocs20.get.size === 2)
+    assert(topLocs20.get.toSet ===
+           Seq(BlockManagerId("a", "hostA", 1000), BlockManagerId("b", "hostB", 1000)).toSet)
+
+    tracker.stop()
+    rpcEnv.shutdown()
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/96a7c888/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 47b2868..833b600 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -490,8 +490,8 @@ class DAGSchedulerSuite
     val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
     submit(reduceRdd, Array(0, 1))
     complete(taskSets(0), Seq(
-        (Success, makeMapStatus("hostA", 1)),
-        (Success, makeMapStatus("hostB", 1))))
+        (Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
+        (Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
     // the 2nd ResultTask failed
     complete(taskSets(1), Seq(
         (Success, 42),
@@ -501,7 +501,7 @@ class DAGSchedulerSuite
     // ask the scheduler to try it again
     scheduler.resubmitFailedStages()
     // have the 2nd attempt pass
-    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
+    complete(taskSets(2), Seq((Success, makeMapStatus("hostA", reduceRdd.partitions.size))))
     // we can see both result blocks now
     assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
       Array("hostA", "hostB"))
@@ -517,8 +517,8 @@ class DAGSchedulerSuite
     val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
     submit(reduceRdd, Array(0, 1))
     complete(taskSets(0), Seq(
-      (Success, makeMapStatus("hostA", 1)),
-      (Success, makeMapStatus("hostB", 1))))
+      (Success, makeMapStatus("hostA", reduceRdd.partitions.size)),
+      (Success, makeMapStatus("hostB", reduceRdd.partitions.size))))
     // The MapOutputTracker should know about both map output locations.
     assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
       Array("hostA", "hostB"))
@@ -560,18 +560,18 @@ class DAGSchedulerSuite
     assert(newEpoch > oldEpoch)
     val taskSet = taskSets(0)
     // should be ignored for being too old
-    runEvent(CompletionEvent(
-      taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
+    runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
+      reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
     // should work because it's a non-failed host
-    runEvent(CompletionEvent(
-      taskSet.tasks(0), Success, makeMapStatus("hostB", 1), null, createFakeTaskInfo(), null))
+    runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB",
+      reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
     // should be ignored for being too old
-    runEvent(CompletionEvent(
-      taskSet.tasks(0), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
+    runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
+      reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
     // should work because it's a new epoch
     taskSet.tasks(1).epoch = newEpoch
-    runEvent(CompletionEvent(
-      taskSet.tasks(1), Success, makeMapStatus("hostA", 1), null, createFakeTaskInfo(), null))
+    runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA",
+      reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
     assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
            Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
     complete(taskSets(1), Seq((Success, 42), (Success, 43)))
@@ -800,6 +800,50 @@ class DAGSchedulerSuite
     assertDataStructuresEmpty()
   }
 
+  test("reduce tasks should be placed locally with map output") {
+    // Create an shuffleMapRdd with 1 partition
+    val shuffleMapRdd = new MyRDD(sc, 1, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+    submit(reduceRdd, Array(0))
+    complete(taskSets(0), Seq(
+        (Success, makeMapStatus("hostA", 1))))
+    assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
+           Array(makeBlockManagerId("hostA")))
+
+    // Reducer should run on the same host that map task ran
+    val reduceTaskSet = taskSets(1)
+    assertLocations(reduceTaskSet, Seq(Seq("hostA")))
+    complete(reduceTaskSet, Seq((Success, 42)))
+    assert(results === Map(0 -> 42))
+    assertDataStructuresEmpty
+  }
+
+  test("reduce task locality preferences should only include machines with largest map outputs") {
+    val numMapTasks = 4
+    // Create an shuffleMapRdd with more partitions
+    val shuffleMapRdd = new MyRDD(sc, numMapTasks, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
+    submit(reduceRdd, Array(0))
+
+    val statuses = (1 to numMapTasks).map { i =>
+      (Success, makeMapStatus("host" + i, 1, (10*i).toByte))
+    }
+    complete(taskSets(0), statuses)
+
+    // Reducer should prefer the last 3 hosts as they have 20%, 30% and 40% of data
+    val hosts = (1 to numMapTasks).map(i => "host" + i).reverse.take(numMapTasks - 1)
+
+    val reduceTaskSet = taskSets(1)
+    assertLocations(reduceTaskSet, Seq(hosts))
+    complete(reduceTaskSet, Seq((Success, 42)))
+    assert(results === Map(0 -> 42))
+    assertDataStructuresEmpty
+  }
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its preferred locations.
    * Note that this checks only the host and not the executor ID.
@@ -807,12 +851,12 @@ class DAGSchedulerSuite
   private def assertLocations(taskSet: TaskSet, hosts: Seq[Seq[String]]) {
     assert(hosts.size === taskSet.tasks.size)
     for ((taskLocs, expectedLocs) <- taskSet.tasks.map(_.preferredLocations).zip(hosts)) {
-      assert(taskLocs.map(_.host) === expectedLocs)
+      assert(taskLocs.map(_.host).toSet === expectedLocs.toSet)
     }
   }
 
-  private def makeMapStatus(host: String, reduces: Int): MapStatus =
-    MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(2))
+  private def makeMapStatus(host: String, reduces: Int, sizes: Byte = 2): MapStatus =
+    MapStatus(makeBlockManagerId(host), Array.fill[Long](reduces)(sizes))
 
   private def makeBlockManagerId(host: String): BlockManagerId =
     BlockManagerId("exec-" + host, host, 12345)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org