You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by rapelly kartheek <ka...@gmail.com> on 2014/11/07 11:11:18 UTC

How spark/*/Storage/BlockManagerMaster.askDriverWithReply() responds to various query messages

 Hi,

I am trying to understand how the
/spark/*/Storage/BlockManagerMaster.askDriverWithReply() works.

def getPeers(blockManagerId: BlockManagerId, numPeers: Int):
Seq[BlockManagerId] = {

val result =
askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))

if (result.length != numPeers) {

throw new SparkException(

"Error getting peers, only got " + result.size + " instead of " + numPeers)

}

result
 }

Here, getPeers calls askDriverWithReply().

 private def askDriverWithReply[T](message: Any): T = {

// TODO: Consider removing multiple attempts

if (driverActor == null) {

throw new SparkException("Error sending message to BlockManager as
driverActor is null " +

"[message = " + message + "]")

}

var attempts = 0

var lastException: Exception = null

while (attempts < AKKA_RETRY_ATTEMPTS) {

attempts += 1

try {

val future = driverActor.ask(message)(timeout)

val result = Await.result(future, timeout)

if (result == null) {

throw new SparkException("BlockManagerMaster returned null")

}

 return result.asInstanceOf[T]

} catch {

case ie: InterruptedException => throw ie

case e: Exception =>

lastException = e

logWarning("Error sending message to BlockManagerMaster in " + attempts + "
attempts", e)

}

Thread.sleep(AKKA_RETRY_INTERVAL_MS)

}

throw new SparkException("Error sending message to BlockManagerMaster
[message = " + message + "]", lastException)
 }

Here, getPeers method calls askDriverWithReply() with message "GetPeers()".
The Driver returns the BlockManagerId's.

val future = driverActor.ask(message)(timeout)

val result = Await.result(future, timeout)
Here, we obtain "result". But, I couldn't find definition of ask() that
processes message GetPeers(). Can someone please tell me how/where the
'result' is being constructed??

Thank you!!
Karthik

Re: How spark/*/Storage/BlockManagerMaster.askDriverWithReply() responds to various query messages

Posted by Imran Rashid <im...@therashids.com>.
ask() is a method on every Actor.  It comes from the akka library, which
spark uses for a lot of the communication between various components.

There is some documentation on ask() here (go to the section on "Send
messages"):
http://doc.akka.io/docs/akka/2.2.3/scala/actors.html

though if you are totally new to it, you might want to work through a
simple akka tutorial first, before diving into the docs.

On Fri, Nov 7, 2014 at 4:11 AM, rapelly kartheek <ka...@gmail.com>
wrote:

>  Hi,
>
> I am trying to understand how the
> /spark/*/Storage/BlockManagerMaster.askDriverWithReply() works.
>
> def getPeers(blockManagerId: BlockManagerId, numPeers: Int):
> Seq[BlockManagerId] = {
>
> val result =
> askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
>
> if (result.length != numPeers) {
>
> throw new SparkException(
>
> "Error getting peers, only got " + result.size + " instead of " + numPeers)
>
> }
>
> result
>  }
>
> Here, getPeers calls askDriverWithReply().
>
>  private def askDriverWithReply[T](message: Any): T = {
>
> // TODO: Consider removing multiple attempts
>
> if (driverActor == null) {
>
> throw new SparkException("Error sending message to BlockManager as
> driverActor is null " +
>
> "[message = " + message + "]")
>
> }
>
> var attempts = 0
>
> var lastException: Exception = null
>
> while (attempts < AKKA_RETRY_ATTEMPTS) {
>
> attempts += 1
>
> try {
>
> val future = driverActor.ask(message)(timeout)
>
> val result = Await.result(future, timeout)
>
> if (result == null) {
>
> throw new SparkException("BlockManagerMaster returned null")
>
> }
>
>  return result.asInstanceOf[T]
>
> } catch {
>
> case ie: InterruptedException => throw ie
>
> case e: Exception =>
>
> lastException = e
>
> logWarning("Error sending message to BlockManagerMaster in " + attempts + "
> attempts", e)
>
> }
>
> Thread.sleep(AKKA_RETRY_INTERVAL_MS)
>
> }
>
> throw new SparkException("Error sending message to BlockManagerMaster
> [message = " + message + "]", lastException)
>  }
>
> Here, getPeers method calls askDriverWithReply() with message "GetPeers()".
> The Driver returns the BlockManagerId's.
>
> val future = driverActor.ask(message)(timeout)
>
> val result = Await.result(future, timeout)
> Here, we obtain "result". But, I couldn't find definition of ask() that
> processes message GetPeers(). Can someone please tell me how/where the
> 'result' is being constructed??
>
> Thank you!!
> Karthik
>