You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Qiuzhuang Lian <qi...@gmail.com> on 2014/02/27 10:28:21 UTC

question on removeRdd method in BlockManagerMasterActor.scala

Hi,

I have one question on removeRdd method in BlockManagerMasterActor.scala
about asking slave actor to remove RDD.

in this piece of code,

    Future.sequence(blockManagerInfo.values.map { bm =>
      bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int]
    }.toSeq)

it asks all blockManagerInfo to remove rdd. Shouldn't we
filter blockManagerInfo so as to only pick up the BlockManagerInfo which
did contains that RDD?

I did my changes to see if making sense,

E:\projects\amplab\spark>git diff
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
b/core/src/main/scold mode 10064
4
new mode 100755
index a999d76..fccc5a9
---
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -128,9 +128,15 @@ class BlockManagerMasterActor(val isLocal: Boolean,
conf: SparkConf) extends Act
     // Find all blocks for the given RDD, remove the block from both
blockLocations and
     // the blockManagerInfo that is tracking the blocks.
     val blocks = blockLocations.keys.flatMap(_.asRDDId).filter(_.rddId ==
rddId)
+    val bmInfos = new
mutable.HashSet[BlockManagerMasterActor.BlockManagerInfo]
     blocks.foreach { blockId =>
       val bms: mutable.HashSet[BlockManagerId] =
blockLocations.get(blockId)
-      bms.foreach(bm =>
blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
+      bms.foreach{ bm =>
+        blockManagerInfo.get(bm).foreach{ bmInfo =>
+          bmInfos += bmInfo
+          bmInfo.removeBlock(blockId)
+        }
+      }
       blockLocations.remove(blockId)
     }

@@ -138,7 +144,7 @@ class BlockManagerMasterActor(val isLocal: Boolean,
conf: SparkConf) extends Act
     // The dispatcher is used as an implicit argument into the Future
sequence construction.
     import context.dispatcher
     val removeMsg = RemoveRdd(rddId)
-    Future.sequence(blockManagerInfo.values.map { bm =>
+    Future.sequence(bmInfos.map { bm =>
       bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int]
     }.toSeq)
   }


Thanks,
Qiuzhuang

Fwd: question on removeRdd method in BlockManagerMasterActor.scala

Posted by Qiuzhuang Lian <qi...@gmail.com>.
Sorry, I should send to the new dev spark address instead.


Hi,

I have one question on removeRdd method in BlockManagerMasterActor.scala
about asking slave actor to remove RDD.

in this piece of code,

    Future.sequence(blockManagerInfo.values.map { bm =>
      bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int]
    }.toSeq)

it asks all blockManagerInfo to remove rdd. Shouldn't we
filter blockManagerInfo so as to only pick up the BlockManagerInfo which
did contains that RDD?

I did my changes to see if making sense,

E:\projects\amplab\spark>git diff
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
b/core/src/main/scold mode 10064
4
new mode 100755
index a999d76..fccc5a9
---
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -128,9 +128,15 @@ class BlockManagerMasterActor(val isLocal: Boolean,
conf: SparkConf) extends Act
     // Find all blocks for the given RDD, remove the block from both
blockLocations and
     // the blockManagerInfo that is tracking the blocks.
     val blocks = blockLocations.keys.flatMap(_.asRDDId).filter(_.rddId ==
rddId)
+    val bmInfos = new
mutable.HashSet[BlockManagerMasterActor.BlockManagerInfo]
     blocks.foreach { blockId =>
       val bms: mutable.HashSet[BlockManagerId] =
blockLocations.get(blockId)
-      bms.foreach(bm =>
blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
+      bms.foreach{ bm =>
+        blockManagerInfo.get(bm).foreach{ bmInfo =>
+          bmInfos += bmInfo
+          bmInfo.removeBlock(blockId)
+        }
+      }
       blockLocations.remove(blockId)
     }

@@ -138,7 +144,7 @@ class BlockManagerMasterActor(val isLocal: Boolean,
conf: SparkConf) extends Act
     // The dispatcher is used as an implicit argument into the Future
sequence construction.
     import context.dispatcher
     val removeMsg = RemoveRdd(rddId)
-    Future.sequence(blockManagerInfo.values.map { bm =>
+    Future.sequence(bmInfos.map { bm =>
       bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int]
     }.toSeq)
   }


Thanks,
Qiuzhuang