You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Priya Ch <le...@gmail.com> on 2014/12/22 14:45:28 UTC

Spark exception when sending message to akka actor

Hi All,

I have akka remote actors running on 2 nodes. I submitted spark application
from node1. In the spark code, in one of the rdd, i am sending message to
actor running on node1. My Spark code is as follows:




class ActorClient extends Actor with Serializable
{
  import context._

  val currentActor: ActorSelection =
context.system.actorSelection("akka.tcp://
ActorSystem@192.168.145.183:2551/user/MasterActor")
  implicit val timeout = Timeout(10 seconds)


  def receive =
  {
  case msg:String => { if(msg.contains("Spark"))
                       { currentActor ! msg
                         sender ! "Local"
                       }
                       else
                       {
                        println("Received.."+msg)
                        val future=currentActor ? msg
                        val result = Await.result(future,
timeout.duration).asInstanceOf[String]
                        if(result.contains("ACK"))
                              sender ! "OK"
                       }
                     }
  case PoisonPill => context.stop(self)
  }
}

object SparkExec extends Serializable
{

  implicit val timeout = Timeout(10 seconds)
   val actorSystem=ActorSystem("ClientActorSystem")
   val
actor=actorSystem.actorOf(Props(classOf[ActorClient]),name="ClientActor")

 def main(args:Array[String]) =
  {

     val conf = new SparkConf().setAppName("DeepLearningSpark")

     val sc=new SparkContext(conf)

    val
textrdd=sc.textFile("hdfs://IMPETUS-DSRV02:9000/deeplearning/sample24k.csv")
    val rdd1=textrddmap{ line => println("In Map...")

                                   val future = actor ? "Hello..Spark"
                                   val result =
Await.result(future,timeout.duration).asInstanceOf[String]
                                   if(result.contains("Local")){
                                     println("Recieved in map...."+result)
                                      //actorSystem.shutdown
                                      }
                                      (10)
                                     }


     val rdd2=rdd1.map{ x =>
                             val future=actor ? "Done"
                             val result = Await.result(future,
timeout.duration).asInstanceOf[String]
                              if(result.contains("OK"))
                              {
                               actorSystem.stop(remoteActor)
                               actorSystem.shutdown
                              }
                             (2) }
     rdd2.saveAsTextFile("/home/padma/SparkAkkaOut")
}

}

In my ActorClientActor, through actorSelection, identifying the remote
actor and sending the message. Once the messages are sent, in *rdd2*, after
receiving ack from remote actor, i am killing the actor ActorClient and
shutting down the ActorSystem.

The above code is throwing the following exception:




14/12/22 19:04:36 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0
(TID 1, IMPETUS-DSRV05.impetus.co.in):
java.lang.ExceptionInInitializerError:
        com.impetus.spark.SparkExec$$anonfun$2.apply(SparkExec.scala:166)
        com.impetus.spark.SparkExec$$anonfun$2.apply(SparkExec.scala:159)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        java.lang.Thread.run(Thread.java:722)
14/12/22 19:04:36 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0
(TID 0, IMPETUS-DSRV05.impetus.co.in): java.lang.NoClassDefFoundError:
Could not initialize class com.impetus.spark.SparkExec$
        com.impetus.spark.SparkExec$$anonfun$2.apply(SparkExec.scala:166)
        com.impetus.spark.SparkExec$$anonfun$2.apply(SparkExec.scala:159)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)

org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
        org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
        org.apache.spark.scheduler.Task.run(Task.scala:54)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
        java.lang.Thread.run(Thread.java:722)


Please anyone could help me on this ? My concern is i want to send message
to an actor within a spark rdd and after sending the messages the
actorsystem need to be shutdown.

Thanks,
Padma Ch