You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by rapelly kartheek <ka...@gmail.com> on 2014/09/05 11:19:51 UTC
question on replicate() in blockManager.scala
Hi,
var cachedPeers: Seq[BlockManagerId] = null
private def replicate(blockId: String, data: ByteBuffer, level:
StorageLevel) {
val tLevel = StorageLevel(level.useDisk, level.useMemory,
level.deserialized, 1)
if (cachedPeers == null) {
cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
}
for (peer: BlockManagerId <- cachedPeers) {
val start = System.nanoTime
data.rewind()
logDebug("Try to replicate BlockId " + blockId + " once; The size of
the data is "
+ data.limit() + " Bytes. To node: " + peer)
if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel),
new ConnectionManagerId(peer.host, peer.port))) {
logError("Failed to call syncPutBlock to " + peer)
}
logDebug("Replicated BlockId " + blockId + " once used " +
(System.nanoTime - start) / 1e6 + " s; The size of the data is " +
data.limit() + " bytes.")
}
I get the flow of this code. But, I dont find any method being called for
actually writing the data into the set of peers chosen for replication.
Where exaclty is the replication happening?
Thank you!!
-Karthik
Re: question on replicate() in blockManager.scala
Posted by Aaron Davidson <il...@gmail.com>.
Looks like that's BlockManagerWorker.syncPutBlock(), which is in an if
check, perhaps obscuring its existence.
On Fri, Sep 5, 2014 at 2:19 AM, rapelly kartheek <ka...@gmail.com>
wrote:
> Hi,
>
> var cachedPeers: Seq[BlockManagerId] = null
> private def replicate(blockId: String, data: ByteBuffer, level:
> StorageLevel) {
> val tLevel = StorageLevel(level.useDisk, level.useMemory,
> level.deserialized, 1)
> if (cachedPeers == null) {
> cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
> }
> for (peer: BlockManagerId <- cachedPeers) {
> val start = System.nanoTime
> data.rewind()
> logDebug("Try to replicate BlockId " + blockId + " once; The size of
> the data is "
> + data.limit() + " Bytes. To node: " + peer)
> if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data,
> tLevel),
> new ConnectionManagerId(peer.host, peer.port))) {
> logError("Failed to call syncPutBlock to " + peer)
> }
> logDebug("Replicated BlockId " + blockId + " once used " +
> (System.nanoTime - start) / 1e6 + " s; The size of the data is " +
> data.limit() + " bytes.")
> }
>
>
> I get the flow of this code. But, I dont find any method being called for
> actually writing the data into the set of peers chosen for replication.
>
> Where exaclty is the replication happening?
>
> Thank you!!
> -Karthik
>