You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Fanchao Meng <fa...@hotmail.com> on 2016/06/30 07:34:17 UTC

How to use scala.tools.nsc.interpreter.IMain in Spark, just like calling eval in Perl.

Hi Spark Community,

I am trying to dynamically interpret code 
given as a String in Spark, just like calling the eval in Perl language.
 However, I got problem when running the program. Really appreciate for 
your help.

**Requirement:**

The requirement is to make the
 spark processing chain configurable. For example, customer could set 
the processing steps in configuration file as below. Steps:
     1) textFile("files///<file_full_path>") 
     2) flatMap(line => line.split(" ")) 
     3) map(word => (word, 1)) 
     4) reduceByKey(_ + _) 
     5) foreach(println)

All above steps are defined in a configuration file.
Then, the spark driver will load the configuration file and make the processing steps as a string, such as:

     val processFlow = 
     """
    
 sc.textFile("file:///input.txt").flatMap(line => line.split(" 
")).map(word => (word, 1)).reduceByKey(_ + _).foreach(println)
     """

Then, Spark will execute the piece of code defined in above variable processFlow.

**Here is my Spark source code:**

It is from word count sample, I just make the RDD methods invoked by interpreter as a string.

     import org.apache.spark.SparkConf
     import org.apache.spark.SparkContext
     import scala.collection.mutable.{Map, ArraySeq}
     import scala.tools.nsc.GenericRunnerSettings
     import scala.tools.nsc.interpreter.IMain
     class TestMain {
       def exec(): Unit = {
         val out = System.out
         val flusher = new java.io.PrintWriter(out)
         val interpreter = {
           val settings = new GenericRunnerSettings( println _ )
           settings.usejavacp.value = true
           new IMain(settings, flusher)
         }
         val conf = new SparkConf().setAppName("TestMain")
         val sc = new SparkContext(conf)
         val methodChain =
           """
           val textFile = sc.textFile("file:///input.txt")
           textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).foreach(println)
           """
         interpreter.bind("sc", sc);
         val resultFlag = interpreter.interpret(methodChain)
       }
     }
     object TestMain {
       def main(args: Array[String]) {
         val testMain = new TestMain()
         testMain.exec()
         System.exit(0)
       }
     }

**Problem:**

However, I got an error when running above Spark code (master=local), logs as below. 

     sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@7d87addd
    
 org.apache.spark.SparkException: Job aborted due to stage failure: Task
 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in 
stage 0.0 (TID 0, localhost): java.lang.ClassNotFoundException: 
$anonfun$1
             at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
             at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
             at java.security.AccessController.doPrivileged(Native Method)
             at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
             at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
             at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
             at java.lang.Class.forName0(Native Method)
             at java.lang.Class.forName(Class.java:270)
             at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
             at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
             at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
             at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
             at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
             at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
             at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
             at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
             at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
             at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
             at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
             at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
             at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
             at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
             at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
             at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
             at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
             at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
             at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
             at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
             at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
             at org.apache.spark.scheduler.Task.run(Task.scala:89)
             at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
             at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
             at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
             at java.lang.Thread.run(Thread.java:745)
     
     Driver stacktrace:
            
 at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
             at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
             at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
             at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
             at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
             at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
             at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
             at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
             at scala.Option.foreach(Option.scala:236)
             at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
             at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
             at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
             at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
             at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
             at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
             at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
             at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
             at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
             at org.apache.spark.SparkContext.runJob(SparkContext.scala:1940)
             at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
             at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
             at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
             at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
             at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
             at .<init>(<console>:12)
             at .<clinit>(<console>)
             at .<init>(<console>:7)
             at .<clinit>(<console>)
             at $print(<console>)
             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
             at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
             at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
             at java.lang.reflect.Method.invoke(Method.java:606)
             at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
             at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
             at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
             at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
             at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
             at com.tr.ecp.test.TestMain.exec(TestMain.scala:44)
             at com.tr.ecp.test.TestMain$.main(TestMain.scala:57)
             at com.tr.ecp.test.TestMain.main(TestMain.scala)
             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
             at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
             at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
             at java.lang.reflect.Method.invoke(Method.java:606)
             at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
             at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
             at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
             at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
             at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
     Caused by: java.lang.ClassNotFoundException: $anonfun$1
             at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
             at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
             at java.security.AccessController.doPrivileged(Native Method)
             at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
             at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
             at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
             at java.lang.Class.forName0(Native Method)
             at java.lang.Class.forName(Class.java:270)
             at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
             at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
             at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
             at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
             at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
             at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
             at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
             at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
             at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
             at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
             at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
             at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
             at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
             at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
             at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
             at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
             at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
             at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
             at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
             at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
             at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
             at org.apache.spark.scheduler.Task.run(Task.scala:89)
             at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
             at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
             at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
             at java.lang.Thread.run(Thread.java:745)
     


Thanks,
Fanchao 		 	   		  

Re: How to use scala.tools.nsc.interpreter.IMain in Spark, just like calling eval in Perl.

Posted by Jayant Shekhar <ja...@gmail.com>.
Hi Fanchao,

This is because it is unable to find the anonymous classes generated.

Adding the below code worked for me. I found the details here :
https://github.com/cloudera/livy/blob/master/repl/src/main/scala/com/cloudera/livy/repl/SparkInterpreter.scala

// Spark 1.6 does not have "classServerUri"; instead, the local
directory where class files
// are stored needs to be registered in SparkConf. See comment in
// SparkILoop::createSparkContext().

Try(sparkIMain.getClass().getMethod("classServerUri")) match {
  case Success(method) =>
    method.setAccessible(true)
    conf.set("spark.repl.class.uri",
method.invoke(sparkIMain).asInstanceOf[String])

  case Failure(_) =>
    val outputDir = sparkIMain.getClass().getMethod("getClassOutputDirectory")
    outputDir.setAccessible(true)
    conf.set("spark.repl.class.outputDir",
      outputDir.invoke(sparkIMain).asInstanceOf[File].getAbsolutePath())
}


Thanks,

Jayant



On Thu, Jun 30, 2016 at 12:34 AM, Fanchao Meng <fa...@hotmail.com>
wrote:

> Hi Spark Community,
>
> I am trying to dynamically interpret code given as a String in Spark, just
> like calling the eval in Perl language. However, I got problem when running
> the program. Really appreciate for your help.
>
> **Requirement:**
>
> The requirement is to make the spark processing chain configurable. For
> example, customer could set the processing steps in configuration file as
> below. Steps:
>      1) textFile("files///<file_full_path>")
>      2) flatMap(line => line.split(" "))
>      3) map(word => (word, 1))
>      4) reduceByKey(_ + _)
>      5) foreach(println)
>
> All above steps are defined in a configuration file.
> Then, the spark driver will load the configuration file and make the
> processing steps as a string, such as:
>
>      val processFlow =
>      """
>      sc.textFile("file:///input.txt").flatMap(line => line.split("
> ")).map(word => (word, 1)).reduceByKey(_ + _).foreach(println)
>      """
>
> Then, Spark will execute the piece of code defined in above variable
> processFlow.
>
> **Here is my Spark source code:**
>
> It is from word count sample, I just make the RDD methods invoked by
> interpreter as a string.
>
>      import org.apache.spark.SparkConf
>      import org.apache.spark.SparkContext
>      import scala.collection.mutable.{Map, ArraySeq}
>      import scala.tools.nsc.GenericRunnerSettings
>      import scala.tools.nsc.interpreter.IMain
>      class TestMain {
>        def exec(): Unit = {
>          val out = System.out
>          val flusher = new java.io.PrintWriter(out)
>          val interpreter = {
>            val settings = new GenericRunnerSettings( println _ )
>            settings.usejavacp.value = true
>            new IMain(settings, flusher)
>          }
>          val conf = new SparkConf().setAppName("TestMain")
>          val sc = new SparkContext(conf)
>          val methodChain =
>            """
>            val textFile = sc.textFile("file:///input.txt")
>            textFile.flatMap(line => line.split(" ")).map(word => (word,
> 1)).reduceByKey(_ + _).foreach(println)
>            """
>          interpreter.bind("sc", sc);
>          val resultFlag = interpreter.interpret(methodChain)
>        }
>      }
>      object TestMain {
>        def main(args: Array[String]) {
>          val testMain = new TestMain()
>          testMain.exec()
>          System.exit(0)
>        }
>      }
>
> **Problem:**
>
> However, I got an error when running above Spark code (master=local), logs
> as below.
>
>      sc: org.apache.spark.SparkContext =
> org.apache.spark.SparkContext@7d87addd
>      org.apache.spark.SparkException: Job aborted due to stage failure:
> Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in
> stage 0.0 (TID 0, localhost): java.lang.ClassNotFoundException: $anonfun$1
>              at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>              at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>              at java.security.AccessController.doPrivileged(Native Method)
>              at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>              at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>              at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>              at java.lang.Class.forName0(Native Method)
>              at java.lang.Class.forName(Class.java:270)
>              at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>              at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>              at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>              at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>              at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>              at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>              at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>              at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>              at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>              at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>              at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>              at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>              at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>              at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>              at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>              at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>              at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>              at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>              at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
>              at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
>              at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>              at org.apache.spark.scheduler.Task.run(Task.scala:89)
>              at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>              at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>              at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>              at java.lang.Thread.run(Thread.java:745)
>
>      Driver stacktrace:
>              at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>              at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>              at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>              at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>              at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>              at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>              at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>              at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
>              at scala.Option.foreach(Option.scala:236)
>              at
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
>              at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
>              at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>              at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>              at
> org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>              at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
>              at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
>              at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
>              at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
>              at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1940)
>              at
> org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
>              at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>              at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>              at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>              at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
>              at .<init>(<console>:12)
>              at .<clinit>(<console>)
>              at .<init>(<console>:7)
>              at .<clinit>(<console>)
>              at $print(<console>)
>              at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>              at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>              at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>              at java.lang.reflect.Method.invoke(Method.java:606)
>              at
> scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>              at
> scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>              at
> scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>              at
> scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>              at
> scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>              at com.tr.ecp.test.TestMain.exec(TestMain.scala:44)
>              at com.tr.ecp.test.TestMain$.main(TestMain.scala:57)
>              at com.tr.ecp.test.TestMain.main(TestMain.scala)
>              at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>              at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>              at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>              at java.lang.reflect.Method.invoke(Method.java:606)
>              at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
>              at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>              at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>              at
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>              at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>      Caused by: java.lang.ClassNotFoundException: $anonfun$1
>              at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>              at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>              at java.security.AccessController.doPrivileged(Native Method)
>              at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>              at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>              at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>              at java.lang.Class.forName0(Native Method)
>              at java.lang.Class.forName(Class.java:270)
>              at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:68)
>              at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>              at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>              at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
>              at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>              at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>              at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>              at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>              at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>              at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>              at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>              at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>              at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>              at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>              at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>              at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>              at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>              at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>              at
> org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
>              at
> org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:115)
>              at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>              at org.apache.spark.scheduler.Task.run(Task.scala:89)
>              at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>              at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>              at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>              at java.lang.Thread.run(Thread.java:745)
>
>
>
> Thanks,
> Fanchao
>