You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Anton Brazhnyk <an...@genesys.com> on 2014/09/03 00:54:13 UTC

RE: [Streaming] Akka-based receiver with messages defined in uploaded jar

It works with "spark.executor.extraClassPath" – no exceptions in this case and I’m getting expected results.
But to me it limits/complicates usage Akka based receivers a lot. Do you think it should be considered as a bug?

Even if it’s not, can it be fixed/worked around by some classloading magic at either Spark or application code?


From: Tathagata Das [mailto:tathagata.das1565@gmail.com]
Sent: Friday, August 29, 2014 7:21 PM
To: Anton Brazhnyk
Cc: user@spark.apache.org
Subject: Re: [Streaming] Akka-based receiver with messages defined in uploaded jar

Can you try adding the JAR to the class path of the executors directly, by setting the config "spark.executor.extraClassPath" in the SparkConf. See Configuration page - http://spark.apache.org/docs/latest/configuration.html#runtime-environment

I think what you guessed is correct. The Akka actor system is not aware of the classes that are dynamically added when the custom jar is added with setJar.

TD
On Fri, Aug 29, 2014 at 6:44 PM, Anton Brazhnyk <an...@genesys.com>> wrote:
Just checked it with 1.0.2
Still same exception.

From: Anton Brazhnyk [mailto:anton.brazhnyk@genesys.com<ma...@genesys.com>]
Sent: Wednesday, August 27, 2014 6:46 PM
To: Tathagata Das
Cc: user@spark.apache.org<ma...@spark.apache.org>
Subject: RE: [Streaming] Akka-based receiver with messages defined in uploaded jar

Sorry for the delay with answer – was on vacation.
As I said I was using modified version of launcher from the example. Modification is just about setting spark master URL in the code to not use run-example script.
The launcher itself was in the attached zip (attaching it once more) as ActorWordCount object.

From: Tathagata Das [mailto:tathagata.das1565@gmail.com]
Sent: Tuesday, August 05, 2014 11:32 PM
To: Anton Brazhnyk
Cc: user@spark.apache.org<ma...@spark.apache.org>
Subject: Re: [Streaming] Akka-based receiver with messages defined in uploaded jar

How are you launching/submitting the program? Using spark-submit? Or some other script (can you provide that)?

TD

On Tue, Aug 5, 2014 at 6:52 PM, Anton Brazhnyk <an...@genesys.com>> wrote:
Went through it once again to leave the only modification in question. Still same exception.
I hope sources as zip file (instead of github) still can be tolerated. :)

Here is the stacktrace generated with this sources:
14/08/05 18:45:54 DEBUG RecurringTimer: Callback for BlockGenerator called at time 1407289554800
14/08/05 18:45:54 ERROR Remoting: org.apache.spark.examples.streaming.CustomMessage
java.lang.ClassNotFoundException: org.apache.spark.examples.streaming.CustomMessage
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:270)
        at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623)
        at akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19)
        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
        at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
        at scala.util.Try$.apply(Try.scala:161)
        at akka.serialization.Serialization.deserialize(Serialization.scala:98)
        at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
        at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)
        at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
        at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
        at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
-----Original Message-----
From: Tathagata Das [mailto:tathagata.das1565@gmail.com<ma...@gmail.com>]
Sent: Tuesday, August 05, 2014 5:42 PM
To: Anton Brazhnyk
Cc: user@spark.apache.org<ma...@spark.apache.org>
Subject: Re: [Streaming] Akka-based receiver with messages defined in uploaded jar

 Can you show us the modified version. The reason could very well be what you suggest, but I want to understand what conditions lead to this.

TD

On Tue, Aug 5, 2014 at 3:55 PM, Anton Brazhnyk <an...@genesys.com>> wrote:
> Greetings,
>
>
>
> I modified ActorWordCount example a little and it uses simple case
> class as the message for Streaming instead of the primitive string.
>
> I also modified launch code to not use run-example script, but set
> spark master in the code and attach the jar (setJars(…)) with all the
> classes including new case class. It runs fine in the local[*] mode
> but fails with ClassNotFoundException in standalone cluster (stacktrace follows).
>
>
>
> I assume it’s the classloader problems and akka remoting just doesn’t
> know about the classes coming to the executor from attached jar.
> Am I right?
>
>
>
> I guess I could pass primitive values around and do my own
> (de)serialization but maybe there is a better way?
>
> What’s the correct way to build custom akka-based receiver with usage
> of non-primitive messages?
>
>
>
>
>
> Here is the log excerpt with stacktrace:
>
> 14/08/04 20:59:41 DEBUG RecurringTimer: Callback for BlockGenerator
> called at time 1407211181800
>
> 14/08/04 20:59:41 ERROR Remoting:
> com.genesys.gpe.analytics.akka.messages.SubscribeAck
>
> java.lang.ClassNotFoundException:
> com.genesys.gpe.analytics.akka.messages.SubscribeAck
>
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>
>         at java.security.AccessController.doPrivileged(Native Method)
>
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
>         at
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
>         at java.lang.Class.forName0(Native Method)
>
>         at java.lang.Class.forName(Class.java:270)
>
>         at
> java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623)
>
>         at
> akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectI
> nputStream.scala:19)
>
>         at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610
> )
>
>         at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)
>
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17
> 69)
>
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
>
>         at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>
>         at
> akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:13
> 6)
>
>         at
> scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
>         at
> akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
>
>         at
> akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serializ
> ation.scala:104)
>
>         at scala.util.Try$.apply(Try.scala:161)
>
>         at
> akka.serialization.Serialization.deserialize(Serialization.scala:98)
>
>         at
> akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
>
>         at
> akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.sca
> la:55)
>
>         at
> akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
>
>         at
> akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
>
>         at
> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.sca
> la:764)
>
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstr
> actDispatcher.scala:386)
>
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.
> java:1339)
>
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:197
> 9)
>
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThrea
> d.java:107)
>
>
>
>
> WBR,
>
> Anton



Re: [Streaming] Akka-based receiver with messages defined in uploaded jar

Posted by Tathagata Das <ta...@gmail.com>.
I am not sure if there is a quick fix for this as the actor is started in
the same actorSystem as the Spark's actor system. And since that actor
system is started as soon as the executor is launched, even before the
application code is launched, there isnt much classloader magic that can be
done.

However, I think a solution could be creating a new actorSystem for this
purpose. It will get created when the receiver is started, and stopped when
the receiver exits. That should not be a big change, though handling the
corner cases of shutting and starting actor systems (as receiver's get
relaunched) configuring that actor system (as it is different from the
Spark's actor system) needs to be handled carefully.

Its best to consider it as a bug. Could you make a JIRA for it? And may be
fix it as well ;) ?

TD


On Tue, Sep 2, 2014 at 3:54 PM, Anton Brazhnyk <an...@genesys.com>
wrote:

>  It works with "spark.executor.extraClassPath" – no exceptions in this
> case and I’m getting expected results.
> But to me it limits/complicates usage Akka based receivers a lot. Do you
> think it should be considered as a bug?
>
>
>
> Even if it’s not, can it be fixed/worked around by some classloading magic
> at either Spark or application code?
>
>
>
>
>
> *From:* Tathagata Das [mailto:tathagata.das1565@gmail.com]
> *Sent:* Friday, August 29, 2014 7:21 PM
>
> *To:* Anton Brazhnyk
> *Cc:* user@spark.apache.org
> *Subject:* Re: [Streaming] Akka-based receiver with messages defined in
> uploaded jar
>
>
>
> Can you try adding the JAR to the class path of the executors directly, by
> setting the config "spark.executor.extraClassPath" in the SparkConf. See
> Configuration page -
> http://spark.apache.org/docs/latest/configuration.html#runtime-environment
>
>
>
> I think what you guessed is correct. The Akka actor system is not aware of
> the classes that are dynamically added when the custom jar is added with
> setJar.
>
>
>
> TD
>
> On Fri, Aug 29, 2014 at 6:44 PM, Anton Brazhnyk <
> anton.brazhnyk@genesys.com> wrote:
>
>  Just checked it with 1.0.2
>
> Still same exception.
>
>
>
> *From:* Anton Brazhnyk [mailto:anton.brazhnyk@genesys.com]
> *Sent:* Wednesday, August 27, 2014 6:46 PM
> *To:* Tathagata Das
> *Cc:* user@spark.apache.org
> *Subject:* RE: [Streaming] Akka-based receiver with messages defined in
> uploaded jar
>
>
>
> Sorry for the delay with answer – was on vacation.
>
> As I said I was using modified version of launcher from the example.
> Modification is just about setting spark master URL in the code to not use
> run-example script.
>
> The launcher itself was in the attached zip (attaching it once more) as
> ActorWordCount object.
>
>
>
> *From:* Tathagata Das [mailto:tathagata.das1565@gmail.com
> <ta...@gmail.com>]
> *Sent:* Tuesday, August 05, 2014 11:32 PM
> *To:* Anton Brazhnyk
> *Cc:* user@spark.apache.org
> *Subject:* Re: [Streaming] Akka-based receiver with messages defined in
> uploaded jar
>
>
>
> How are you launching/submitting the program? Using spark-submit? Or some
> other script (can you provide that)?
>
>
>
> TD
>
>
>
> On Tue, Aug 5, 2014 at 6:52 PM, Anton Brazhnyk <an...@genesys.com>
> wrote:
>
> Went through it once again to leave the only modification in question.
> Still same exception.
> I hope sources as zip file (instead of github) still can be tolerated. :)
>
> Here is the stacktrace generated with this sources:
> 14/08/05 18:45:54 DEBUG RecurringTimer: Callback for BlockGenerator called
> at time 1407289554800
> 14/08/05 18:45:54 ERROR Remoting:
> org.apache.spark.examples.streaming.CustomMessage
> java.lang.ClassNotFoundException:
> org.apache.spark.examples.streaming.CustomMessage
>
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:270)
>         at
> java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623)
>         at
> akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19)
>         at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610)
>         at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769)
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>         at
> akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
>         at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>         at
> akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
>         at
> akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
>         at scala.util.Try$.apply(Try.scala:161)
>         at
> akka.serialization.Serialization.deserialize(Serialization.scala:98)
>         at
> akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
>         at
> akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)
>         at
> akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
>         at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
>         at
> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> -----Original Message-----
> From: Tathagata Das [mailto:tathagata.das1565@gmail.com]
> Sent: Tuesday, August 05, 2014 5:42 PM
> To: Anton Brazhnyk
> Cc: user@spark.apache.org
> Subject: Re: [Streaming] Akka-based receiver with messages defined in
> uploaded jar
>
>  Can you show us the modified version. The reason could very well be what
> you suggest, but I want to understand what conditions lead to this.
>
> TD
>
> On Tue, Aug 5, 2014 at 3:55 PM, Anton Brazhnyk <an...@genesys.com>
> wrote:
> > Greetings,
> >
> >
> >
> > I modified ActorWordCount example a little and it uses simple case
> > class as the message for Streaming instead of the primitive string.
> >
> > I also modified launch code to not use run-example script, but set
> > spark master in the code and attach the jar (setJars(…)) with all the
> > classes including new case class. It runs fine in the local[*] mode
> > but fails with ClassNotFoundException in standalone cluster (stacktrace
> follows).
> >
> >
> >
> > I assume it’s the classloader problems and akka remoting just doesn’t
> > know about the classes coming to the executor from attached jar.
> > Am I right?
> >
> >
> >
> > I guess I could pass primitive values around and do my own
> > (de)serialization but maybe there is a better way?
> >
> > What’s the correct way to build custom akka-based receiver with usage
> > of non-primitive messages?
> >
> >
> >
> >
> >
> > Here is the log excerpt with stacktrace:
> >
> > 14/08/04 20:59:41 DEBUG RecurringTimer: Callback for BlockGenerator
> > called at time 1407211181800
> >
> > 14/08/04 20:59:41 ERROR Remoting:
> > com.genesys.gpe.analytics.akka.messages.SubscribeAck
> >
> > java.lang.ClassNotFoundException:
> > com.genesys.gpe.analytics.akka.messages.SubscribeAck
> >
> >         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> >
> >         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> >
> >         at java.security.AccessController.doPrivileged(Native Method)
> >
> >         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> >
> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >
> >         at
> > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> >
> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >
> >         at java.lang.Class.forName0(Native Method)
> >
> >         at java.lang.Class.forName(Class.java:270)
> >
> >         at
> > java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623)
> >
> >         at
> > akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectI
> > nputStream.scala:19)
> >
> >         at
> > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610
> > )
> >
> >         at
> > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)
> >
> >         at
> > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17
> > 69)
> >
> >         at
> > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
> >
> >         at
> > java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> >
> >         at
> > akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:13
> > 6)
> >
> >         at
> > scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> >
> >         at
> > akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
> >
> >         at
> > akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serializ
> > ation.scala:104)
> >
> >         at scala.util.Try$.apply(Try.scala:161)
> >
> >         at
> > akka.serialization.Serialization.deserialize(Serialization.scala:98)
> >
> >         at
> > akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
> >
> >         at
> > akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.sca
> > la:55)
> >
> >         at
> > akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
> >
> >         at
> > akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
> >
> >         at
> > akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.sca
> > la:764)
> >
> >         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> >
> >         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> >
> >         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> >
> >         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> >
> >         at
> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstr
> > actDispatcher.scala:386)
> >
> >         at
> > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >
> >         at
> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.
> > java:1339)
> >
> >         at
> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:197
> > 9)
> >
> >         at
> > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThrea
> > d.java:107)
> >
> >
> >
> >
> > WBR,
> >
> > Anton
>
>
>
>
>