You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean Owen (JIRA)" <ji...@apache.org> on 2015/06/19 12:44:04 UTC

[jira] [Updated] (SPARK-7332) RpcCallContext.sender has a different name from the original sender's name

     [ https://issues.apache.org/jira/browse/SPARK-7332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Owen updated SPARK-7332:
-----------------------------
    Target Version/s:   (was: 1.4.0)

> RpcCallContext.sender has a different name from the original sender's name
> --------------------------------------------------------------------------
>
>                 Key: SPARK-7332
>                 URL: https://issues.apache.org/jira/browse/SPARK-7332
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.4.0
>            Reporter: Qiping Li
>            Assignee: Shixiong Zhu
>            Priority: Critical
>
> In the function {{receiveAndReply}} of {{RpcEndpoint}}, we get the sender of the received message through {{context.sender}}. But this doesn't work because we don't get the right {{RpcEndpointRef}}. It's name is different from the original sender's name, so the path is different.
> Here is the code to test it:
> {code}
> case class Greeting(who: String)
> class GreetingActor(override val rpcEnv: RpcEnv) extends RpcEndpoint with Logging {
>   override def receiveAndReply(context: RpcCallContext) : PartialFunction[Any, Unit] = {
>     case Greeting(who) =>
>       logInfo("Hello " + who)
>       logInfo(s"${context.sender.name}")
>   }
> }
> class ToSend(override val rpcEnv: RpcEnv, greeting: RpcEndpointRef) extends RpcEndpoint with Logging {
>   override def onStart(): Unit = {
>     logInfo(s"${self.name}")
>     greeting.ask(Greeting("Charlie Parker"))
>   }
> }
> object RpcEndpointNameTest {
>   def main(args: Array[String]): Unit = {
>     val actorSystemName = "driver"
>     val conf = new SparkConf
>     val rpcEnv = RpcEnv.create(actorSystemName, "localhost", 0, conf, new SecurityManager(conf))
>     val greeter = rpcEnv.setupEndpoint("greeter", new GreetingActor(rpcEnv))
>     rpcEnv.setupEndpoint("toSend", new ToSend(rpcEnv, greeter))
>   }
> }
> {code}
> The result was:
> {code}
> toSend
> Hello Charlie Parker
> $a
> {code}
> I test the above code using akka with the following code:
> {code}
> case class Greeting(who: String)
> class GreetingActor extends Actor with ActorLogging {
>   def receive = {
>     case Greeting(who) =>
>       println("Hello " + who)
>       println(s"${sender.path} ${sender.path.name}")
>   }
> }
> class ToSend(greeting: ActorRef) extends Actor with ActorLogging {
>   override def preStart(): Unit = {
>     println(s"${self.path} ${self.path.name}")
>     greeting ! Greeting("Charlie Parker")
>   }
>   def receive = {
>     case _ =>
>       log.info("here")
>   }
> }
> object HelloWorld {
>   def main(args: Array[String]): Unit = {
>     val system = ActorSystem("MySystem")
>     val greeter = system.actorOf(Props[GreetingActor], name = "greeter")
>     println(s"${greeter.path} ${greeter.path.name}")
>     val system2 = ActorSystem("MySystem2")
>     system2.actorOf(Props(classOf[ToSend], greeter), name = "toSend_2")
>   }
> }
> {code}
> And the result was:
> {code}
> akka://MySystem/user/greeter greeter
> akka://MySystem2/user/toSend_2 toSend_2
> Hello Charlie Parker
> akka://MySystem2/user/toSend_2 toSend_2
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org