You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/12/21 23:03:31 UTC

spark git commit: [SPARK-12392][CORE] Optimize a location order of broadcast blocks by considering preferred local hosts

Repository: spark
Updated Branches:
  refs/heads/master 4883a5087 -> 935f46630


[SPARK-12392][CORE] Optimize a location order of broadcast blocks by considering preferred local hosts

When multiple workers exist in a host, we can bypass unnecessary remote access for broadcasts; block managers fetch broadcast blocks from the same host instead of remote hosts.

Author: Takeshi YAMAMURO <li...@gmail.com>

Closes #10346 from maropu/OptimizeBlockLocationOrder.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/935f4663
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/935f4663
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/935f4663

Branch: refs/heads/master
Commit: 935f46630685306edbdec91f71710703317fe129
Parents: 4883a50
Author: Takeshi YAMAMURO <li...@gmail.com>
Authored: Mon Dec 21 14:02:40 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Mon Dec 21 14:03:23 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala  | 12 +++++++++++-
 .../apache/spark/storage/BlockManagerSuite.scala | 19 ++++++++++++++++++-
 2 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/935f4663/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 6074fc5..b5b7804 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -578,9 +578,19 @@ private[spark] class BlockManager(
     doGetRemote(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
   }
 
+  /**
+   * Return a list of locations for the given block, prioritizing the local machine since
+   * multiple block managers can share the same host.
+   */
+  private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
+    val locs = Random.shuffle(master.getLocations(blockId))
+    val (preferredLocs, otherLocs) = locs.partition { loc => blockManagerId.host == loc.host }
+    preferredLocs ++ otherLocs
+  }
+
   private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
     require(blockId != null, "BlockId is null")
-    val locations = Random.shuffle(master.getLocations(blockId))
+    val locations = getLocations(blockId)
     var numFetchFailures = 0
     for (loc <- locations) {
       logDebug(s"Getting remote block $blockId from $loc")

http://git-wip-us.apache.org/repos/asf/spark/blob/935f4663/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 53991d8..bf49be3 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -26,6 +26,7 @@ import scala.language.implicitConversions
 import scala.language.postfixOps
 
 import org.mockito.Mockito.{mock, when}
+import org.mockito.{Matchers => mc}
 import org.scalatest._
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.Timeouts._
@@ -66,7 +67,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
 
   private def makeBlockManager(
       maxMem: Long,
-      name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
+      name: String = SparkContext.DRIVER_IDENTIFIER,
+      master: BlockManagerMaster = this.master): BlockManager = {
     val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
     val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
     val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf,
@@ -451,6 +453,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
     assert(list2DiskGet.get.readMethod === DataReadMethod.Disk)
   }
 
+  test("optimize a location order of blocks") {
+    val localHost = Utils.localHostName()
+    val otherHost = "otherHost"
+    val bmMaster = mock(classOf[BlockManagerMaster])
+    val bmId1 = BlockManagerId("id1", localHost, 1)
+    val bmId2 = BlockManagerId("id2", localHost, 2)
+    val bmId3 = BlockManagerId("id3", otherHost, 3)
+    when(bmMaster.getLocations(mc.any[BlockId])).thenReturn(Seq(bmId1, bmId2, bmId3))
+
+    val blockManager = makeBlockManager(128, "exec", bmMaster)
+    val getLocations = PrivateMethod[Seq[BlockManagerId]]('getLocations)
+    val locations = blockManager invokePrivate getLocations(BroadcastBlockId(0))
+    assert(locations.map(_.host) === Seq(localHost, localHost, otherHost))
+  }
+
   test("SPARK-9591: getRemoteBytes from another location when Exception throw") {
     val origTimeoutOpt = conf.getOption("spark.network.timeout")
     try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org