You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Qiuzhuang Lian <qi...@gmail.com> on 2014/02/27 10:28:21 UTC
question on removeRdd method in BlockManagerMasterActor.scala
Hi,
I have one question on removeRdd method in BlockManagerMasterActor.scala
about asking slave actor to remove RDD.
in this piece of code,
Future.sequence(blockManagerInfo.values.map { bm =>
bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int]
}.toSeq)
it asks all blockManagerInfo to remove rdd. Shouldn't we
filter blockManagerInfo so as to only pick up the BlockManagerInfo which
did contains that RDD?
I did my changes to see if making sense,
E:\projects\amplab\spark>git diff
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
b/core/src/main/scold mode 10064
4
new mode 100755
index a999d76..fccc5a9
---
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -128,9 +128,15 @@ class BlockManagerMasterActor(val isLocal: Boolean,
conf: SparkConf) extends Act
// Find all blocks for the given RDD, remove the block from both
blockLocations and
// the blockManagerInfo that is tracking the blocks.
val blocks = blockLocations.keys.flatMap(_.asRDDId).filter(_.rddId ==
rddId)
+ val bmInfos = new
mutable.HashSet[BlockManagerMasterActor.BlockManagerInfo]
blocks.foreach { blockId =>
val bms: mutable.HashSet[BlockManagerId] =
blockLocations.get(blockId)
- bms.foreach(bm =>
blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
+ bms.foreach{ bm =>
+ blockManagerInfo.get(bm).foreach{ bmInfo =>
+ bmInfos += bmInfo
+ bmInfo.removeBlock(blockId)
+ }
+ }
blockLocations.remove(blockId)
}
@@ -138,7 +144,7 @@ class BlockManagerMasterActor(val isLocal: Boolean,
conf: SparkConf) extends Act
// The dispatcher is used as an implicit argument into the Future
sequence construction.
import context.dispatcher
val removeMsg = RemoveRdd(rddId)
- Future.sequence(blockManagerInfo.values.map { bm =>
+ Future.sequence(bmInfos.map { bm =>
bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int]
}.toSeq)
}
Thanks,
Qiuzhuang
Fwd: question on removeRdd method in BlockManagerMasterActor.scala
Posted by Qiuzhuang Lian <qi...@gmail.com>.
Sorry, I should send to the new dev spark address instead.
Hi,
I have one question on removeRdd method in BlockManagerMasterActor.scala
about asking slave actor to remove RDD.
in this piece of code,
Future.sequence(blockManagerInfo.values.map { bm =>
bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int]
}.toSeq)
it asks all blockManagerInfo to remove rdd. Shouldn't we
filter blockManagerInfo so as to only pick up the BlockManagerInfo which
did contains that RDD?
I did my changes to see if making sense,
E:\projects\amplab\spark>git diff
diff --git
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
b/core/src/main/scold mode 10064
4
new mode 100755
index a999d76..fccc5a9
---
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -128,9 +128,15 @@ class BlockManagerMasterActor(val isLocal: Boolean,
conf: SparkConf) extends Act
// Find all blocks for the given RDD, remove the block from both
blockLocations and
// the blockManagerInfo that is tracking the blocks.
val blocks = blockLocations.keys.flatMap(_.asRDDId).filter(_.rddId ==
rddId)
+ val bmInfos = new
mutable.HashSet[BlockManagerMasterActor.BlockManagerInfo]
blocks.foreach { blockId =>
val bms: mutable.HashSet[BlockManagerId] =
blockLocations.get(blockId)
- bms.foreach(bm =>
blockManagerInfo.get(bm).foreach(_.removeBlock(blockId)))
+ bms.foreach{ bm =>
+ blockManagerInfo.get(bm).foreach{ bmInfo =>
+ bmInfos += bmInfo
+ bmInfo.removeBlock(blockId)
+ }
+ }
blockLocations.remove(blockId)
}
@@ -138,7 +144,7 @@ class BlockManagerMasterActor(val isLocal: Boolean,
conf: SparkConf) extends Act
// The dispatcher is used as an implicit argument into the Future
sequence construction.
import context.dispatcher
val removeMsg = RemoveRdd(rddId)
- Future.sequence(blockManagerInfo.values.map { bm =>
+ Future.sequence(bmInfos.map { bm =>
bm.slaveActor.ask(removeMsg)(akkaTimeout).mapTo[Int]
}.toSeq)
}
Thanks,
Qiuzhuang