You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by rapelly kartheek <ka...@gmail.com> on 2014/09/05 11:19:51 UTC

question on replicate() in blockManager.scala

Hi,

var cachedPeers: Seq[BlockManagerId] = null
  private def replicate(blockId: String, data: ByteBuffer, level:
StorageLevel) {
    val tLevel = StorageLevel(level.useDisk, level.useMemory,
level.deserialized, 1)
    if (cachedPeers == null) {
      cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
    }
    for (peer: BlockManagerId <- cachedPeers) {
      val start = System.nanoTime
      data.rewind()
      logDebug("Try to replicate BlockId " + blockId + " once; The size of
the data is "
        + data.limit() + " Bytes. To node: " + peer)
      if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data, tLevel),
        new ConnectionManagerId(peer.host, peer.port))) {
        logError("Failed to call syncPutBlock to " + peer)
      }
      logDebug("Replicated BlockId " + blockId + " once used " +
        (System.nanoTime - start) / 1e6 + " s; The size of the data is " +
        data.limit() + " bytes.")
    }


I get the flow of this code. But, I dont find any method being called for
actually writing the data into the set of peers chosen for replication.

Where exaclty is the replication happening?

Thank you!!
-Karthik

Re: question on replicate() in blockManager.scala

Posted by Aaron Davidson <il...@gmail.com>.
Looks like that's BlockManagerWorker.syncPutBlock(), which is in an if
check, perhaps obscuring its existence.


On Fri, Sep 5, 2014 at 2:19 AM, rapelly kartheek <ka...@gmail.com>
wrote:

> Hi,
>
> var cachedPeers: Seq[BlockManagerId] = null
>   private def replicate(blockId: String, data: ByteBuffer, level:
> StorageLevel) {
>     val tLevel = StorageLevel(level.useDisk, level.useMemory,
> level.deserialized, 1)
>     if (cachedPeers == null) {
>       cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
>     }
>     for (peer: BlockManagerId <- cachedPeers) {
>       val start = System.nanoTime
>       data.rewind()
>       logDebug("Try to replicate BlockId " + blockId + " once; The size of
> the data is "
>         + data.limit() + " Bytes. To node: " + peer)
>       if (!BlockManagerWorker.syncPutBlock(PutBlock(blockId, data,
> tLevel),
>         new ConnectionManagerId(peer.host, peer.port))) {
>         logError("Failed to call syncPutBlock to " + peer)
>       }
>       logDebug("Replicated BlockId " + blockId + " once used " +
>         (System.nanoTime - start) / 1e6 + " s; The size of the data is " +
>         data.limit() + " bytes.")
>     }
>
>
> I get the flow of this code. But, I dont find any method being called for
> actually writing the data into the set of peers chosen for replication.
>
> Where exaclty is the replication happening?
>
> Thank you!!
> -Karthik
>