You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Priya Ch <le...@gmail.com> on 2014/12/22 14:45:28 UTC
Spark exception when sending message to akka actor
Hi All,
I have akka remote actors running on 2 nodes. I submitted spark application
from node1. In the spark code, in one of the rdd, i am sending message to
actor running on node1. My Spark code is as follows:
class ActorClient extends Actor with Serializable
{
import context._
val currentActor: ActorSelection =
context.system.actorSelection("akka.tcp://
ActorSystem@192.168.145.183:2551/user/MasterActor")
implicit val timeout = Timeout(10 seconds)
def receive =
{
case msg:String => { if(msg.contains("Spark"))
{ currentActor ! msg
sender ! "Local"
}
else
{
println("Received.."+msg)
val future=currentActor ? msg
val result = Await.result(future,
timeout.duration).asInstanceOf[String]
if(result.contains("ACK"))
sender ! "OK"
}
}
case PoisonPill => context.stop(self)
}
}
object SparkExec extends Serializable
{
implicit val timeout = Timeout(10 seconds)
val actorSystem=ActorSystem("ClientActorSystem")
val
actor=actorSystem.actorOf(Props(classOf[ActorClient]),name="ClientActor")
def main(args:Array[String]) =
{
val conf = new SparkConf().setAppName("DeepLearningSpark")
val sc=new SparkContext(conf)
val
textrdd=sc.textFile("hdfs://IMPETUS-DSRV02:9000/deeplearning/sample24k.csv")
val rdd1=textrddmap{ line => println("In Map...")
val future = actor ? "Hello..Spark"
val result =
Await.result(future,timeout.duration).asInstanceOf[String]
if(result.contains("Local")){
println("Recieved in map...."+result)
//actorSystem.shutdown
}
(10)
}
val rdd2=rdd1.map{ x =>
val future=actor ? "Done"
val result = Await.result(future,
timeout.duration).asInstanceOf[String]
if(result.contains("OK"))
{
actorSystem.stop(remoteActor)
actorSystem.shutdown
}
(2) }
rdd2.saveAsTextFile("/home/padma/SparkAkkaOut")
}
}
In my ActorClientActor, through actorSelection, identifying the remote
actor and sending the message. Once the messages are sent, in *rdd2*, after
receiving ack from remote actor, i am killing the actor ActorClient and
shutting down the ActorSystem.
The above code is throwing the following exception:
14/12/22 19:04:36 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0
(TID 1, IMPETUS-DSRV05.impetus.co.in):
java.lang.ExceptionInInitializerError:
com.impetus.spark.SparkExec$$anonfun$2.apply(SparkExec.scala:166)
com.impetus.spark.SparkExec$$anonfun$2.apply(SparkExec.scala:159)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
java.lang.Thread.run(Thread.java:722)
14/12/22 19:04:36 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0
(TID 0, IMPETUS-DSRV05.impetus.co.in): java.lang.NoClassDefFoundError:
Could not initialize class com.impetus.spark.SparkExec$
com.impetus.spark.SparkExec$$anonfun$2.apply(SparkExec.scala:166)
com.impetus.spark.SparkExec$$anonfun$2.apply(SparkExec.scala:159)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984)
org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
java.lang.Thread.run(Thread.java:722)
Please anyone could help me on this ? My concern is i want to send message
to an actor within a spark rdd and after sending the messages the
actorsystem need to be shutdown.
Thanks,
Padma Ch