You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Dominik Safaric <do...@gmail.com> on 2017/03/01 13:19:50 UTC

Spark Streaming - java.lang.ClassNotFoundException Scala anonymous function

I've been trying to submit a Spark Streaming application using spark-submit to a cluster of mine consisting of a master and two worker nodes. The application has been written in Scala, and build using Maven. Importantly, the Maven build is configured to produce a fat JAR containing all dependencies. Furthermore, the JAR has been distributed to all of nodes. The streaming job has been submitted using the following command:

bin/spark-submit --class topology.SimpleProcessingTopology --jars /tmp/spark_streaming-1.0-SNAPSHOT.jar --master spark://10.0.0.8:7077 --verbose /tmp/spark_streaming-1.0-SNAPSHOT.jar /tmp/streaming-benchmark.properties 
where 10.0.0.8 is the IP address of the master node within the VNET. 

However, I keep getting the following exception while starting the streaming application:

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)

Caused by: java.lang.ClassNotFoundException: topology.SimpleProcessingTopology$$anonfun$main$1$$anonfun$apply$1
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
I've checked the content of the JAR using jar tvf and as you can see in the output below, it does contain the class in question.

1735 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology$$anonfun$main$1.class
   702 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology.class
  2415 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology$$anonfun$main$1$$anonfun$apply$1$$anonfun$apply$2.class
  2500 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology$$anonfun$main$1$$anonfun$apply$1.class
  7045 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology$.class
This exception has been caused due to the anonymous function of the foreachPartition call:

rdd.foreachPartition(partition => {
      val outTopic = props.getString("application.simple.kafka.out.topic")
      val producer = new KafkaProducer[Array[Byte],Array[Byte]](kafkaParams)
      partition.foreach(record => {
        val producerRecord = new ProducerRecord[Array[Byte], Array[Byte]](outTopic, record.key(), record.value())
        producer.send(producerRecord)
      })
      producer.close()
    })
Unfortunately, I am not able to find the root cause of this since so far. Hence, I would appreciate if anyone could help me out fixing this issue.


Re: Spark Streaming - java.lang.ClassNotFoundException Scala anonymous function

Posted by Dominik Safaric <do...@gmail.com>.
The jars I am submitting are the following:

bin/spark-submit --class topology.SimpleProcessingTopology --master spark://10.0.0.8:7077 --jars /tmp/spark_streaming-1.0-SNAPSHOT.jar /tmp//tmp/spark_streaming-1.0-SNAPSHOT.jar /tmp/streaming.properties

I’ve even tried using the spark.executor.extraClassPath option but unfortunately unsuccessfully. 

What do you mean by conflicting copies of Spark classes? Could you elaborate it?

> On 1 Mar 2017, at 14:51, Sean Owen <so...@cloudera.com> wrote:
> 
> What is the --jars you are submitting? You may have conflicting copies of Spark classes that interfere.
> 
> 
> On Wed, Mar 1, 2017, 14:20 Dominik Safaric <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
> I've been trying to submit a Spark Streaming application using spark-submit to a cluster of mine consisting of a master and two worker nodes. The application has been written in Scala, and build using Maven. Importantly, the Maven build is configured to produce a fat JAR containing all dependencies. Furthermore, the JAR has been distributed to all of nodes. The streaming job has been submitted using the following command:
> 
> bin/spark-submit --class topology.SimpleProcessingTopology --jars /tmp/spark_streaming-1.0-SNAPSHOT.jar --master spark://10.0.0.8:7077 <http://10.0.0.8:7077/> --verbose /tmp/spark_streaming-1.0-SNAPSHOT.jar /tmp/streaming-benchmark.properties 
> where 10.0.0.8 is the IP address of the master node within the VNET. 
> 
> However, I keep getting the following exception while starting the streaming application:
> 
> Driver stacktrace:
>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
>     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
> 
> Caused by: java.lang.ClassNotFoundException: topology.SimpleProcessingTopology$$anonfun$main$1$$anonfun$apply$1
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>     at java.lang.Class.forName0(Native Method)
>     at java.lang.Class.forName(Class.java:348)
>     at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
> I've checked the content of the JAR using jar tvf and as you can see in the output below, it does contain the class in question.
> 
> 1735 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology$$anonfun$main$1.class
>    702 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology.class
>   2415 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology$$anonfun$main$1$$anonfun$apply$1$$anonfun$apply$2.class
>   2500 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology$$anonfun$main$1$$anonfun$apply$1.class
>   7045 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology$.class
> This exception has been caused due to the anonymous function of the foreachPartition call:
> 
> rdd.foreachPartition(partition => {
>       val outTopic = props.getString("application.simple.kafka.out.topic")
>       val producer = new KafkaProducer[Array[Byte],Array[Byte]](kafkaParams)
>       partition.foreach(record => {
>         val producerRecord = new ProducerRecord[Array[Byte], Array[Byte]](outTopic, record.key(), record.value())
>         producer.send(producerRecord)
>       })
>       producer.close()
>     })
> Unfortunately, I am not able to find the root cause of this since so far. Hence, I would appreciate if anyone could help me out fixing this issue.
> 


Re: Spark Streaming - java.lang.ClassNotFoundException Scala anonymous function

Posted by Sean Owen <so...@cloudera.com>.
What is the --jars you are submitting? You may have conflicting copies of
Spark classes that interfere.

On Wed, Mar 1, 2017, 14:20 Dominik Safaric <do...@gmail.com> wrote:

> I've been trying to submit a Spark Streaming application using
> spark-submit to a cluster of mine consisting of a master and two worker
> nodes. The application has been written in Scala, and build using Maven.
> Importantly, the Maven build is configured to produce a fat JAR containing
> all dependencies. Furthermore, the JAR has been distributed to all of
> nodes. The streaming job has been submitted using the following command:
>
> bin/spark-submit --class topology.SimpleProcessingTopology --jars /tmp/spark_streaming-1.0-SNAPSHOT.jar --master spark://10.0.0.8:7077 --verbose /tmp/spark_streaming-1.0-SNAPSHOT.jar /tmp/streaming-benchmark.properties
>
> where 10.0.0.8 is the IP address of the master node within the VNET.
>
> However, I keep getting the following exception while starting the
> streaming application:
>
> Driver stacktrace:
>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422)
>     at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>     at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
> Caused by: java.lang.ClassNotFoundException: topology.SimpleProcessingTopology$$anonfun$main$1$$anonfun$apply$1
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>     at java.lang.Class.forName0(Native Method)
>     at java.lang.Class.forName(Class.java:348)
>     at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
>
> I've checked the content of the JAR using jar tvf and as you can see in
> the output below, it does contain the class in question.
>
> 1735 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology$$anonfun$main$1.class
>    702 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology.class
>   2415 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology$$anonfun$main$1$$anonfun$apply$1$$anonfun$apply$2.class
>   2500 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology$$anonfun$main$1$$anonfun$apply$1.class
>   7045 Wed Mar 01 12:29:20 UTC 2017 topology/SimpleProcessingTopology$.class
>
> This exception has been caused due to the anonymous function of the
> foreachPartition call:
>
> rdd.foreachPartition(partition => {
>       val outTopic = props.getString("application.simple.kafka.out.topic")
>       val producer = new KafkaProducer[Array[Byte],Array[Byte]](kafkaParams)
>       partition.foreach(record => {
>         val producerRecord = new ProducerRecord[Array[Byte], Array[Byte]](outTopic, record.key(), record.value())
>         producer.send(producerRecord)
>       })
>       producer.close()
>     })
>
> Unfortunately, I am not able to find the root cause of this since so far.
> Hence, I would appreciate if anyone could help me out fixing this issue.
>
>