You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by rapelly kartheek <ka...@gmail.com> on 2014/11/07 11:11:18 UTC
How spark/*/Storage/BlockManagerMaster.askDriverWithReply() responds
to various query messages
Hi,
I am trying to understand how the
/spark/*/Storage/BlockManagerMaster.askDriverWithReply() works.
def getPeers(blockManagerId: BlockManagerId, numPeers: Int):
Seq[BlockManagerId] = {
val result =
askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
if (result.length != numPeers) {
throw new SparkException(
"Error getting peers, only got " + result.size + " instead of " + numPeers)
}
result
}
Here, getPeers calls askDriverWithReply().
private def askDriverWithReply[T](message: Any): T = {
// TODO: Consider removing multiple attempts
if (driverActor == null) {
throw new SparkException("Error sending message to BlockManager as
driverActor is null " +
"[message = " + message + "]")
}
var attempts = 0
var lastException: Exception = null
while (attempts < AKKA_RETRY_ATTEMPTS) {
attempts += 1
try {
val future = driverActor.ask(message)(timeout)
val result = Await.result(future, timeout)
if (result == null) {
throw new SparkException("BlockManagerMaster returned null")
}
return result.asInstanceOf[T]
} catch {
case ie: InterruptedException => throw ie
case e: Exception =>
lastException = e
logWarning("Error sending message to BlockManagerMaster in " + attempts + "
attempts", e)
}
Thread.sleep(AKKA_RETRY_INTERVAL_MS)
}
throw new SparkException("Error sending message to BlockManagerMaster
[message = " + message + "]", lastException)
}
Here, getPeers method calls askDriverWithReply() with message "GetPeers()".
The Driver returns the BlockManagerId's.
val future = driverActor.ask(message)(timeout)
val result = Await.result(future, timeout)
Here, we obtain "result". But, I couldn't find definition of ask() that
processes message GetPeers(). Can someone please tell me how/where the
'result' is being constructed??
Thank you!!
Karthik
Re: How spark/*/Storage/BlockManagerMaster.askDriverWithReply()
responds to various query messages
Posted by Imran Rashid <im...@therashids.com>.
ask() is a method on every Actor. It comes from the akka library, which
spark uses for a lot of the communication between various components.
There is some documentation on ask() here (go to the section on "Send
messages"):
http://doc.akka.io/docs/akka/2.2.3/scala/actors.html
though if you are totally new to it, you might want to work through a
simple akka tutorial first, before diving into the docs.
On Fri, Nov 7, 2014 at 4:11 AM, rapelly kartheek <ka...@gmail.com>
wrote:
> Hi,
>
> I am trying to understand how the
> /spark/*/Storage/BlockManagerMaster.askDriverWithReply() works.
>
> def getPeers(blockManagerId: BlockManagerId, numPeers: Int):
> Seq[BlockManagerId] = {
>
> val result =
> askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
>
> if (result.length != numPeers) {
>
> throw new SparkException(
>
> "Error getting peers, only got " + result.size + " instead of " + numPeers)
>
> }
>
> result
> }
>
> Here, getPeers calls askDriverWithReply().
>
> private def askDriverWithReply[T](message: Any): T = {
>
> // TODO: Consider removing multiple attempts
>
> if (driverActor == null) {
>
> throw new SparkException("Error sending message to BlockManager as
> driverActor is null " +
>
> "[message = " + message + "]")
>
> }
>
> var attempts = 0
>
> var lastException: Exception = null
>
> while (attempts < AKKA_RETRY_ATTEMPTS) {
>
> attempts += 1
>
> try {
>
> val future = driverActor.ask(message)(timeout)
>
> val result = Await.result(future, timeout)
>
> if (result == null) {
>
> throw new SparkException("BlockManagerMaster returned null")
>
> }
>
> return result.asInstanceOf[T]
>
> } catch {
>
> case ie: InterruptedException => throw ie
>
> case e: Exception =>
>
> lastException = e
>
> logWarning("Error sending message to BlockManagerMaster in " + attempts + "
> attempts", e)
>
> }
>
> Thread.sleep(AKKA_RETRY_INTERVAL_MS)
>
> }
>
> throw new SparkException("Error sending message to BlockManagerMaster
> [message = " + message + "]", lastException)
> }
>
> Here, getPeers method calls askDriverWithReply() with message "GetPeers()".
> The Driver returns the BlockManagerId's.
>
> val future = driverActor.ask(message)(timeout)
>
> val result = Await.result(future, timeout)
> Here, we obtain "result". But, I couldn't find definition of ask() that
> processes message GetPeers(). Can someone please tell me how/where the
> 'result' is being constructed??
>
> Thank you!!
> Karthik
>