You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Sampo Niskanen <sa...@wellmo.com> on 2014/02/26 08:44:36 UTC

Implementing a custom Spark shell

Hi,

I'd like to create a custom version of the Spark shell, which has
automatically defined some other variables / RDDs (in addition to 'sc')
specific to our application.  Is this possible?

I took a look at the code that the spark-shell invokes, and it seems quite
complex.  Can this be reused from my code?


I'm implementing a standalone application that uses the Spark libraries
(managed by SBT).  Ideally, I'd like to be able to launch the shell from
that application, instead of using the default Spark distribution.
 Alternatively, can some utility code be injected within the standard
spark-shell?


Thanks.

*    Sampo Niskanen*

*Lead developer / Wellmo*

Re: Implementing a custom Spark shell

Posted by Sampo Niskanen <sa...@wellmo.com>.
Hi,

I've tried to enable debug logging, but can't figure out what might be
going wrong.  Can anyone assist decyphering the log?

The log of the startup and run attempts is at http://pastebin.com/XyeY92VF
This uses SparkILoop, DEBUG level logging and settings.debug.value = true
option.

Line 323:  Spark welcome message
Line 746:  The NullPointerException that occurs during startup whenever I
use SparkILoop instead of ILoop
Lines 1973-2252:  Running an RDD count, which works correctly
Lines 2254-2890:  Running an RDD filter + count, which fails due to a
ClassNotFoundException (line 2528)


Thanks.


*    Sampo Niskanen*

*Lead developer / Wellmo*
    sampo.niskanen@wellmo.com
    +358 40 820 5291



On Fri, Feb 28, 2014 at 10:46 AM, Prashant Sharma <sc...@gmail.com>wrote:

> You can enable debug logging for repl, thankfully it uses sparks logging
> framework. Trouble must be with wrappers.
>
> Prashant Sharma
>
>
> On Fri, Feb 28, 2014 at 12:29 PM, Sampo Niskanen <
> sampo.niskanen@wellmo.com> wrote:
>
>> Hi,
>>
>> Thanks for the pointers.  I did get my code working within the normal
>> spark-shell.  However, since I'm building a separate analysis service which
>> pulls in the Spark libraries using SBT, I'd much rather have the custom
>> shell incorporated in that, instead of having to use the default
>> downloadable distribution.
>>
>>
>> I figured out how to create a custom Scala REPL using the instructions at
>> http://stackoverflow.com/questions/18628516/embedded-scala-repl-interpreter-example-for-2-10 (The latter answer is my helper class that I use.)
>>
>> I injected the SparkContext and my RDD's and for example rdd.count works
>> fine.  However, when I try to perform a filter operation, I get a
>> ClassNotFoundException [1].  My guess is that the inline function I define
>> is created only within the REPL, and does not get sent to the processors
>> (even though I'm using a local cluster).
>>
>> I found out that there's a separate spark-repl library, which contains
>> the SparkILoop class.  When I replace the ILoop with SparkILoop, I get the
>> Spark logo + version number, a NullPointerException [2] and then the Scala
>> prompt.  Still, I get exactly the same ClassNotFoundException when trying
>> to perform a filter operation.
>>
>> Can anyone give any pointers on how to get this working?
>>
>>
>> Best regards,
>>    Sampo N.
>>
>>
>>
>> ClassNotFoundException [1]:
>>
>> scala> data.profile.filter(p => p.email == "sampo.niskanen@mwsoy.com
>> ").count
>> 14/02/28 08:49:16 ERROR Executor: Exception in task ID 1
>> java.lang.ClassNotFoundException: $line9.$read$$iw$$iw$$anonfun$1
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>>  at java.security.AccessController.doPrivileged(Native Method)
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>>  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>>  at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:270)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
>>  at
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>>  at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>  at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>>  at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>>  at
>> org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63)
>> at
>> org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
>>  at
>> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>  at
>> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>> at
>> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
>>  at
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:195)
>> at
>> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:744)
>> 14/02/28 08:49:16 ERROR TaskSetManager: Task 1.0:0 failed 1 times;
>> aborting job
>> org.apache.spark.SparkException: Job aborted: Task 1.0:0 failed 1 times
>> (most recent failure: Exception failure: java.lang.ClassNotFoundException:
>> $anonfun$1)
>>  at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>>  at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>  at org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>>  at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>>  at scala.Option.foreach(Option.scala:236)
>> at
>> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
>>  at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>  at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>> at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>  at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> NullPointerException [2]:
>>
>> Welcome to
>>       ____              __
>>      / __/__  ___ _____/ /__
>>     _\ \/ _ \/ _ `/ __/  '_/
>>    /___/ .__/\_,_/_/ /_/\_\   version 0.9.0
>>       /_/
>>
>> Using Scala version 2.10.3 (OpenJDK 64-Bit Server VM, Java 1.7.0_51)
>> Type in expressions to have them evaluated.
>> Type :help for more information.
>> java.lang.NullPointerException
>> at $iwC$$iwC.<init>(<console>:8)
>> at $iwC.<init>(<console>:14)
>>  at <init>(<console>:16)
>> at .<init>(<console>:20)
>> at .<clinit>(<console>)
>>  at .<init>(<console>:7)
>> at .<clinit>(<console>)
>> at $print(<console>)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>  at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>>  at
>> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772)
>> at
>> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040)
>>  at
>> org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609)
>> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640)
>>  at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604)
>> at
>> org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:788)
>>  at
>> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:833)
>> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:745)
>>  at
>> org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:119)
>> at
>> org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:118)
>>  at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:258)
>> at
>> org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:118)
>>  at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:53)
>> at
>> org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:903)
>>  at
>> org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:140)
>> at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:53)
>>  at
>> org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:102)
>> at
>> org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:53)
>>  at
>> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:920)
>> at
>> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)
>>  at
>> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)
>> at
>> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>>  at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:876)
>> at com.wellmo.reporting.Repl$.run(Repl.scala:30)
>>  at
>> com.wellmo.reporting.WellmoReportingScala.run(WellmoReportingScala.scala:60)
>> at
>> com.wellmo.reporting.WellmoReportingJava.run(WellmoReportingJava.java:44)
>>  at
>> com.wellmo.reporting.WellmoReportingJava.main(WellmoReportingJava.java:33)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>  at java.lang.reflect.Method.invoke(Method.java:606)
>> at sbt.Run.invokeMain(Run.scala:68)
>> at sbt.Run.run0(Run.scala:61)
>>  at sbt.Run.execute$1(Run.scala:50)
>> at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:54)
>> at sbt.TrapExit$.executeMain$1(TrapExit.scala:33)
>>  at sbt.TrapExit$$anon$1.run(TrapExit.scala:42)
>>
>> Spark context available as sc.
>>
>>
>>
>> *    Sampo Niskanen*
>>
>> *Lead developer / Wellmo*
>>     sampo.niskanen@wellmo.com
>>     +358 40 820 5291
>>
>>
>>
>> On Wed, Feb 26, 2014 at 10:24 AM, Matei Zaharia <ma...@gmail.com>wrote:
>>
>>> In Spark 0.9 and master, you can pass the -i argument to spark-shell to
>>> load a script containing commands before opening the prompt. This is also a
>>> feature of the Scala shell as a whole (try scala -help for details).
>>>
>>> Also, once you're in the shell, you can use :load file.scala to execute
>>> the content of file.scala as if you'd typed it into the shell.
>>>
>>> Matei
>>>
>>> On Feb 25, 2014, at 11:44 PM, Sampo Niskanen <sa...@wellmo.com>
>>> wrote:
>>>
>>> > Hi,
>>> >
>>> > I'd like to create a custom version of the Spark shell, which has
>>> automatically defined some other variables / RDDs (in addition to 'sc')
>>> specific to our application.  Is this possible?
>>> >
>>> > I took a look at the code that the spark-shell invokes, and it seems
>>> quite complex.  Can this be reused from my code?
>>> >
>>> >
>>> > I'm implementing a standalone application that uses the Spark
>>> libraries (managed by SBT).  Ideally, I'd like to be able to launch the
>>> shell from that application, instead of using the default Spark
>>> distribution.  Alternatively, can some utility code be injected within the
>>> standard spark-shell?
>>> >
>>> >
>>> > Thanks.
>>> >
>>> >     Sampo Niskanen
>>> >     Lead developer / Wellmo
>>> >
>>> >
>>>
>>>
>>
>

Re: Implementing a custom Spark shell

Posted by Prashant Sharma <sc...@gmail.com>.
You can enable debug logging for repl, thankfully it uses sparks logging
framework. Trouble must be with wrappers.

Prashant Sharma


On Fri, Feb 28, 2014 at 12:29 PM, Sampo Niskanen
<sa...@wellmo.com>wrote:

> Hi,
>
> Thanks for the pointers.  I did get my code working within the normal
> spark-shell.  However, since I'm building a separate analysis service which
> pulls in the Spark libraries using SBT, I'd much rather have the custom
> shell incorporated in that, instead of having to use the default
> downloadable distribution.
>
>
> I figured out how to create a custom Scala REPL using the instructions at
> http://stackoverflow.com/questions/18628516/embedded-scala-repl-interpreter-example-for-2-10 (The latter answer is my helper class that I use.)
>
> I injected the SparkContext and my RDD's and for example rdd.count works
> fine.  However, when I try to perform a filter operation, I get a
> ClassNotFoundException [1].  My guess is that the inline function I define
> is created only within the REPL, and does not get sent to the processors
> (even though I'm using a local cluster).
>
> I found out that there's a separate spark-repl library, which contains the
> SparkILoop class.  When I replace the ILoop with SparkILoop, I get the
> Spark logo + version number, a NullPointerException [2] and then the Scala
> prompt.  Still, I get exactly the same ClassNotFoundException when trying
> to perform a filter operation.
>
> Can anyone give any pointers on how to get this working?
>
>
> Best regards,
>    Sampo N.
>
>
>
> ClassNotFoundException [1]:
>
> scala> data.profile.filter(p => p.email == "sampo.niskanen@mwsoy.com
> ").count
> 14/02/28 08:49:16 ERROR Executor: Exception in task ID 1
> java.lang.ClassNotFoundException: $line9.$read$$iw$$iw$$anonfun$1
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>  at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>  at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>  at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:270)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
>  at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>  at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>  at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>  at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>  at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
> at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
>  at
> org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63)
> at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
>  at
> java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
>  at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>  at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
> at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
>  at
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:195)
> at
> org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
> 14/02/28 08:49:16 ERROR TaskSetManager: Task 1.0:0 failed 1 times;
> aborting job
> org.apache.spark.SparkException: Job aborted: Task 1.0:0 failed 1 times
> (most recent failure: Exception failure: java.lang.ClassNotFoundException:
> $anonfun$1)
>  at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>  at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>  at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>  at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>  at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
>  at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>  at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>  at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
> NullPointerException [2]:
>
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /___/ .__/\_,_/_/ /_/\_\   version 0.9.0
>       /_/
>
> Using Scala version 2.10.3 (OpenJDK 64-Bit Server VM, Java 1.7.0_51)
> Type in expressions to have them evaluated.
> Type :help for more information.
> java.lang.NullPointerException
> at $iwC$$iwC.<init>(<console>:8)
> at $iwC.<init>(<console>:14)
>  at <init>(<console>:16)
> at .<init>(<console>:20)
> at .<clinit>(<console>)
>  at .<init>(<console>:7)
> at .<clinit>(<console>)
> at $print(<console>)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>  at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
>  at
> org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772)
> at
> org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040)
>  at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609)
> at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640)
>  at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604)
> at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:788)
>  at
> org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:833)
> at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:745)
>  at
> org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:119)
> at
> org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:118)
>  at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:258)
> at
> org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:118)
>  at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:53)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:903)
>  at
> org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:140)
> at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:53)
>  at
> org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:102)
> at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:53)
>  at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:920)
> at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)
>  at
> org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)
> at
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>  at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:876)
> at com.wellmo.reporting.Repl$.run(Repl.scala:30)
>  at
> com.wellmo.reporting.WellmoReportingScala.run(WellmoReportingScala.scala:60)
> at
> com.wellmo.reporting.WellmoReportingJava.run(WellmoReportingJava.java:44)
>  at
> com.wellmo.reporting.WellmoReportingJava.main(WellmoReportingJava.java:33)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:606)
> at sbt.Run.invokeMain(Run.scala:68)
> at sbt.Run.run0(Run.scala:61)
>  at sbt.Run.execute$1(Run.scala:50)
> at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:54)
> at sbt.TrapExit$.executeMain$1(TrapExit.scala:33)
>  at sbt.TrapExit$$anon$1.run(TrapExit.scala:42)
>
> Spark context available as sc.
>
>
>
> *    Sampo Niskanen*
>
> *Lead developer / Wellmo*
>     sampo.niskanen@wellmo.com
>     +358 40 820 5291
>
>
>
> On Wed, Feb 26, 2014 at 10:24 AM, Matei Zaharia <ma...@gmail.com>wrote:
>
>> In Spark 0.9 and master, you can pass the -i argument to spark-shell to
>> load a script containing commands before opening the prompt. This is also a
>> feature of the Scala shell as a whole (try scala -help for details).
>>
>> Also, once you're in the shell, you can use :load file.scala to execute
>> the content of file.scala as if you'd typed it into the shell.
>>
>> Matei
>>
>> On Feb 25, 2014, at 11:44 PM, Sampo Niskanen <sa...@wellmo.com>
>> wrote:
>>
>> > Hi,
>> >
>> > I'd like to create a custom version of the Spark shell, which has
>> automatically defined some other variables / RDDs (in addition to 'sc')
>> specific to our application.  Is this possible?
>> >
>> > I took a look at the code that the spark-shell invokes, and it seems
>> quite complex.  Can this be reused from my code?
>> >
>> >
>> > I'm implementing a standalone application that uses the Spark libraries
>> (managed by SBT).  Ideally, I'd like to be able to launch the shell from
>> that application, instead of using the default Spark distribution.
>>  Alternatively, can some utility code be injected within the standard
>> spark-shell?
>> >
>> >
>> > Thanks.
>> >
>> >     Sampo Niskanen
>> >     Lead developer / Wellmo
>> >
>> >
>>
>>
>

Re: Implementing a custom Spark shell

Posted by Sampo Niskanen <sa...@wellmo.com>.
Hi,

Thanks for the pointers.  I did get my code working within the normal
spark-shell.  However, since I'm building a separate analysis service which
pulls in the Spark libraries using SBT, I'd much rather have the custom
shell incorporated in that, instead of having to use the default
downloadable distribution.


I figured out how to create a custom Scala REPL using the instructions at
http://stackoverflow.com/questions/18628516/embedded-scala-repl-interpreter-example-for-2-10
(The latter answer is my helper class that I use.)

I injected the SparkContext and my RDD's and for example rdd.count works
fine.  However, when I try to perform a filter operation, I get a
ClassNotFoundException [1].  My guess is that the inline function I define
is created only within the REPL, and does not get sent to the processors
(even though I'm using a local cluster).

I found out that there's a separate spark-repl library, which contains the
SparkILoop class.  When I replace the ILoop with SparkILoop, I get the
Spark logo + version number, a NullPointerException [2] and then the Scala
prompt.  Still, I get exactly the same ClassNotFoundException when trying
to perform a filter operation.

Can anyone give any pointers on how to get this working?


Best regards,
   Sampo N.



ClassNotFoundException [1]:

scala> data.profile.filter(p => p.email == "sampo.niskanen@mwsoy.com").count
14/02/28 08:49:16 ERROR Executor: Exception in task ID 1
java.lang.ClassNotFoundException: $line9.$read$$iw$$iw$$anonfun$1
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
 at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
 at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
 at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
 at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
 at
org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63)
at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
 at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
 at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
 at
org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:195)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
14/02/28 08:49:16 ERROR TaskSetManager: Task 1.0:0 failed 1 times; aborting
job
org.apache.spark.SparkException: Job aborted: Task 1.0:0 failed 1 times
(most recent failure: Exception failure: java.lang.ClassNotFoundException:
$anonfun$1)
 at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
 at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
 at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
 at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
 at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


NullPointerException [2]:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 0.9.0
      /_/

Using Scala version 2.10.3 (OpenJDK 64-Bit Server VM, Java 1.7.0_51)
Type in expressions to have them evaluated.
Type :help for more information.
java.lang.NullPointerException
at $iwC$$iwC.<init>(<console>:8)
at $iwC.<init>(<console>:14)
 at <init>(<console>:16)
at .<init>(<console>:20)
at .<clinit>(<console>)
 at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
 at
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772)
at
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040)
 at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640)
 at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:788)
 at
org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:833)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:745)
 at
org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:119)
at
org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:118)
 at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:258)
at
org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:118)
 at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:53)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:903)
 at
org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:140)
at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:53)
 at
org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:102)
at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:53)
 at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:920)
at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)
 at
org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)
at
scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
 at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:876)
at com.wellmo.reporting.Repl$.run(Repl.scala:30)
 at
com.wellmo.reporting.WellmoReportingScala.run(WellmoReportingScala.scala:60)
at com.wellmo.reporting.WellmoReportingJava.run(WellmoReportingJava.java:44)
 at
com.wellmo.reporting.WellmoReportingJava.main(WellmoReportingJava.java:33)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:606)
at sbt.Run.invokeMain(Run.scala:68)
at sbt.Run.run0(Run.scala:61)
 at sbt.Run.execute$1(Run.scala:50)
at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:54)
at sbt.TrapExit$.executeMain$1(TrapExit.scala:33)
 at sbt.TrapExit$$anon$1.run(TrapExit.scala:42)

Spark context available as sc.



*    Sampo Niskanen*

*Lead developer / Wellmo*
    sampo.niskanen@wellmo.com
    +358 40 820 5291



On Wed, Feb 26, 2014 at 10:24 AM, Matei Zaharia <ma...@gmail.com>wrote:

> In Spark 0.9 and master, you can pass the -i argument to spark-shell to
> load a script containing commands before opening the prompt. This is also a
> feature of the Scala shell as a whole (try scala -help for details).
>
> Also, once you're in the shell, you can use :load file.scala to execute
> the content of file.scala as if you'd typed it into the shell.
>
> Matei
>
> On Feb 25, 2014, at 11:44 PM, Sampo Niskanen <sa...@wellmo.com>
> wrote:
>
> > Hi,
> >
> > I'd like to create a custom version of the Spark shell, which has
> automatically defined some other variables / RDDs (in addition to 'sc')
> specific to our application.  Is this possible?
> >
> > I took a look at the code that the spark-shell invokes, and it seems
> quite complex.  Can this be reused from my code?
> >
> >
> > I'm implementing a standalone application that uses the Spark libraries
> (managed by SBT).  Ideally, I'd like to be able to launch the shell from
> that application, instead of using the default Spark distribution.
>  Alternatively, can some utility code be injected within the standard
> spark-shell?
> >
> >
> > Thanks.
> >
> >     Sampo Niskanen
> >     Lead developer / Wellmo
> >
> >
>
>

Re: Implementing a custom Spark shell

Posted by Matei Zaharia <ma...@gmail.com>.
In Spark 0.9 and master, you can pass the -i argument to spark-shell to load a script containing commands before opening the prompt. This is also a feature of the Scala shell as a whole (try scala -help for details).

Also, once you’re in the shell, you can use :load file.scala to execute the content of file.scala as if you’d typed it into the shell.

Matei

On Feb 25, 2014, at 11:44 PM, Sampo Niskanen <sa...@wellmo.com> wrote:

> Hi,
> 
> I'd like to create a custom version of the Spark shell, which has automatically defined some other variables / RDDs (in addition to 'sc') specific to our application.  Is this possible?
> 
> I took a look at the code that the spark-shell invokes, and it seems quite complex.  Can this be reused from my code?
> 
> 
> I'm implementing a standalone application that uses the Spark libraries (managed by SBT).  Ideally, I'd like to be able to launch the shell from that application, instead of using the default Spark distribution.  Alternatively, can some utility code be injected within the standard spark-shell?
> 
> 
> Thanks.
> 
>     Sampo Niskanen
>     Lead developer / Wellmo
> 
>