You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Aniket Bhatnagar <an...@gmail.com> on 2014/12/08 14:53:23 UTC
Programmatically running spark jobs using yarn-client
I am trying to create (yet another) spark as a service tool that lets you
submit jobs via REST APIs. I think I have nearly gotten it to work baring a
few issues. Some of which seem already fixed in 1.2.0 (like SPARK-2889) but
I have hit the road block with the following issue.
I have created a simple spark job as following:
class StaticJob {
import SparkContext._
override def run(sc: SparkContext): Result = {
val array = Range(1, 10000000).toArray
val rdd = sc.parallelize(array)
val paired = rdd.map(i => (i % 10000, i)).sortByKey()
val sum = paired.countByKey()
SimpleResult(sum)
}
}
When I submit this job programmatically, it gives me a class not found
error:
2014-12-08 05:41:18,421 [Result resolver thread-0] [warn]
o.a.s.s.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0,
localhost.localdomain): java.lang.ClassNotFoundException:
com.blah.server.examples.StaticJob$$anonfun$1
java.net.URLClassLoader$1.run(URLClassLoader.java:366)
java.net.URLClassLoader$1.run(URLClassLoader.java:355)
java.security.AccessController.doPrivileged(Native Method)
java.net.URLClassLoader.findClass(URLClassLoader.java:354)
java.lang.ClassLoader.loadClass(ClassLoader.java:425)
java.lang.ClassLoader.loadClass(ClassLoader.java:358)
java.lang.Class.forName0(Native Method)
java.lang.Class.forName(Class.java:270)
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
I decompiled the StaticJob$$anonfun$1 class and it seems to point to
closure 'rdd.map(i => (i % 10000, i))'. I am sure why this is happening.
Any help will be greatly appreciated.
Re: Programmatically running spark jobs using yarn-client
Posted by Aniket Bhatnagar <an...@gmail.com>.
Thanks Akhil. I was wondering why it isn't available to find the class even
though it existed in the same class loader as SparkContext. As a
workaround, I used the following code the add all dependent jars in a
playframework application to spark context.
@tailrec
private def addClassPathJars(sparkContext: SparkContext, classLoader:
ClassLoader): Unit = {
classLoader match {
case urlClassLoader: URLClassLoader => {
urlClassLoader.getURLs.foreach(classPathUrl => {
if (classPathUrl.toExternalForm.endsWith(".jar")) {
LOGGER.debug(s"Added $classPathUrl to spark context
$sparkContext")
sparkContext.addJar(classPathUrl.toExternalForm)
} else {
LOGGER.debug(s"Ignored $classPathUrl while adding to spark
context $sparkContext")
}
})
}
case _ => LOGGER.debug(s"Ignored class loader $classLoader as it does
not subclasses URLClassLoader")
}
if (classLoader.getParent != null){
addClassPathJars(sparkContext, classLoader.getParent)
}
}
On Mon Dec 08 2014 at 21:39:42 Akhil Das <ak...@sigmoidanalytics.com> wrote:
> How are you submitting the job? You need to create a jar of your code (sbt
> package will give you one inside target/scala-*/projectname-*.jar) and then
> use it while submitting. If you are not using spark-submit then you can
> simply add this jar to spark by
> sc.addJar("/path/to/target/scala*/projectname*jar")
>
> Thanks
> Best Regards
>
> On Mon, Dec 8, 2014 at 7:23 PM, Aniket Bhatnagar <
> aniket.bhatnagar@gmail.com> wrote:
>
>> I am trying to create (yet another) spark as a service tool that lets you
>> submit jobs via REST APIs. I think I have nearly gotten it to work baring a
>> few issues. Some of which seem already fixed in 1.2.0 (like SPARK-2889) but
>> I have hit the road block with the following issue.
>>
>> I have created a simple spark job as following:
>>
>> class StaticJob {
>> import SparkContext._
>> override def run(sc: SparkContext): Result = {
>> val array = Range(1, 10000000).toArray
>> val rdd = sc.parallelize(array)
>> val paired = rdd.map(i => (i % 10000, i)).sortByKey()
>> val sum = paired.countByKey()
>> SimpleResult(sum)
>> }
>> }
>>
>> When I submit this job programmatically, it gives me a class not found
>> error:
>>
>> 2014-12-08 05:41:18,421 [Result resolver thread-0] [warn]
>> o.a.s.s.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0,
>> localhost.localdomain): java.lang.ClassNotFoundException:
>> com.blah.server.examples.StaticJob$$anonfun$1
>> java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>> java.security.AccessController.doPrivileged(Native Method)
>> java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>> java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>> java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>> java.lang.Class.forName0(Native Method)
>> java.lang.Class.forName(Class.java:270)
>>
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
>>
>> I decompiled the StaticJob$$anonfun$1 class and it seems to point to
>> closure 'rdd.map(i => (i % 10000, i))'. I am sure why this is happening.
>> Any help will be greatly appreciated.
>>
>
>
Re: Programmatically running spark jobs using yarn-client
Posted by Akhil Das <ak...@sigmoidanalytics.com>.
How are you submitting the job? You need to create a jar of your code (sbt
package will give you one inside target/scala-*/projectname-*.jar) and then
use it while submitting. If you are not using spark-submit then you can
simply add this jar to spark by
sc.addJar("/path/to/target/scala*/projectname*jar")
Thanks
Best Regards
On Mon, Dec 8, 2014 at 7:23 PM, Aniket Bhatnagar <aniket.bhatnagar@gmail.com
> wrote:
> I am trying to create (yet another) spark as a service tool that lets you
> submit jobs via REST APIs. I think I have nearly gotten it to work baring a
> few issues. Some of which seem already fixed in 1.2.0 (like SPARK-2889) but
> I have hit the road block with the following issue.
>
> I have created a simple spark job as following:
>
> class StaticJob {
> import SparkContext._
> override def run(sc: SparkContext): Result = {
> val array = Range(1, 10000000).toArray
> val rdd = sc.parallelize(array)
> val paired = rdd.map(i => (i % 10000, i)).sortByKey()
> val sum = paired.countByKey()
> SimpleResult(sum)
> }
> }
>
> When I submit this job programmatically, it gives me a class not found
> error:
>
> 2014-12-08 05:41:18,421 [Result resolver thread-0] [warn]
> o.a.s.s.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0,
> localhost.localdomain): java.lang.ClassNotFoundException:
> com.blah.server.examples.StaticJob$$anonfun$1
> java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> java.security.AccessController.doPrivileged(Native Method)
> java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> java.lang.Class.forName0(Native Method)
> java.lang.Class.forName(Class.java:270)
>
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
>
> I decompiled the StaticJob$$anonfun$1 class and it seems to point to
> closure 'rdd.map(i => (i % 10000, i))'. I am sure why this is happening.
> Any help will be greatly appreciated.
>