You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mykhaylo Telizhyn (JIRA)" <ji...@apache.org> on 2014/12/11 14:02:14 UTC

[jira] [Updated] (SPARK-4830) Spark Java Application : java.lang.ClassNotFoundException

     [ https://issues.apache.org/jira/browse/SPARK-4830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Mykhaylo Telizhyn updated SPARK-4830:
-------------------------------------
    Description: 
We have Spark Streaming application that consumes messages from RabbitMQ and process them. When generating hundreds of events on RabbitMQ and running our application on spark standalone cluster we see some java.lang.ClassNotFoundException exceptions in the log.


Application Overview:


Our domain model is simple POJO that represents RabbitMQ events we want to consume and contains some custom properties we are interested in: 
{code:title=Event.java|borderStyle=solid}
        class Event implements java.io.Externalizable {
    
            // custom properties

            // custom implementation of writeExternal(), readExternal() methods
        }    
{code}
        We have implemented custom spark receiver that just receives messages from RabbitMQ queue by means of custom consumer(See "Receiving messages by subscription" at https://www.rabbitmq.com/api-guide.html), converts them to our custom domain event objects(com.xxx.Event) and stores them on spark memory:
{code:title=RabbitMQReceiver.java|borderStyle=solid}
            byte[] body = // data received from Rabbit using custom consumer
            Event event = new Event(body);
            store(event)  // store into Spark              
 {code}                

        The main program is simple, it just set up spark streaming context:
{code:title=SparkApplication.java|borderStyle=solid}
            SparkConf sparkConf = new SparkConf().setAppName(APPLICATION_NAME);
            sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());  
{code}
        Initialize input streams:
{code:title=SparkApplication.java|borderStyle=solid}            
            ReceiverInputDStream<Event> stream = // create input stream from RabbitMQ
            JavaReceiverInputDStream<Event> events = new JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
{code}            
        Process events:
{code:title=SparkApplication.java|borderStyle=solid}            
            events.foreachRDD(
                    rdd -> {

                        rdd.foreachPartition(

                                partition -> {
 
                                        // process partition
                                }
                        }
                    })
                    
            ssc.start();
            ssc.awaitTermination();
{code}

Application submission:            
            
        Application is packaged into single fat jar using maven shade plugin(http://maven.apache.org/plugins/maven-shade-plugin/). It compiled with spark version 1.1.0           
        We run our application on spark version 1.1.0 standalone cluster that consists of driver host, master host and two worker hosts. We submit application from driver host.
        On one of the workers we see java.lang.ClassNotFoundException exceptions:  

        We see that worker has downloaded application.jar and added it to class loader:

14/11/27 10:26:59 INFO Executor: Fetching http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
14/11/27 10:26:59 INFO Utils: Fetching http://xx.xx.xx.xx:38287/jars/application.jar to /tmp/fetchFileTemp8223721356974787443.tmp
14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
14/11/27 10:27:01 INFO Executor: Adding file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class loader

...........

14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
java.lang.ClassNotFoundException: com.xxx.Event
    at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:344)
	at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
	at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
	at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
	at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
	at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
	at org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
	at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
	at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
	at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
	at org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
	at org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
	at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
	at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
	at org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
	at org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)



We manually checked jar file and it contains com.xxx.Event class  













> Spark Java Application : java.lang.ClassNotFoundException
> ---------------------------------------------------------
>
>                 Key: SPARK-4830
>                 URL: https://issues.apache.org/jira/browse/SPARK-4830
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.1.0
>            Reporter: Mykhaylo Telizhyn
>
> We have Spark Streaming application that consumes messages from RabbitMQ and process them. When generating hundreds of events on RabbitMQ and running our application on spark standalone cluster we see some java.lang.ClassNotFoundException exceptions in the log.
> Application Overview:
> Our domain model is simple POJO that represents RabbitMQ events we want to consume and contains some custom properties we are interested in: 
> {code:title=Event.java|borderStyle=solid}
>         class Event implements java.io.Externalizable {
>     
>             // custom properties
>             // custom implementation of writeExternal(), readExternal() methods
>         }    
> {code}
>         We have implemented custom spark receiver that just receives messages from RabbitMQ queue by means of custom consumer(See "Receiving messages by subscription" at https://www.rabbitmq.com/api-guide.html), converts them to our custom domain event objects(com.xxx.Event) and stores them on spark memory:
> {code:title=RabbitMQReceiver.java|borderStyle=solid}
>             byte[] body = // data received from Rabbit using custom consumer
>             Event event = new Event(body);
>             store(event)  // store into Spark              
>  {code}                
>         The main program is simple, it just set up spark streaming context:
> {code:title=SparkApplication.java|borderStyle=solid}
>             SparkConf sparkConf = new SparkConf().setAppName(APPLICATION_NAME);
>             sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());  
> {code}
>         Initialize input streams:
> {code:title=SparkApplication.java|borderStyle=solid}            
>             ReceiverInputDStream<Event> stream = // create input stream from RabbitMQ
>             JavaReceiverInputDStream<Event> events = new JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
> {code}            
>         Process events:
> {code:title=SparkApplication.java|borderStyle=solid}            
>             events.foreachRDD(
>                     rdd -> {
>                         rdd.foreachPartition(
>                                 partition -> {
>  
>                                         // process partition
>                                 }
>                         }
>                     })
>                     
>             ssc.start();
>             ssc.awaitTermination();
> {code}
> Application submission:            
>             
>         Application is packaged into single fat jar using maven shade plugin(http://maven.apache.org/plugins/maven-shade-plugin/). It compiled with spark version 1.1.0           
>         We run our application on spark version 1.1.0 standalone cluster that consists of driver host, master host and two worker hosts. We submit application from driver host.
>         On one of the workers we see java.lang.ClassNotFoundException exceptions:  
>         We see that worker has downloaded application.jar and added it to class loader:
> 14/11/27 10:26:59 INFO Executor: Fetching http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
> 14/11/27 10:26:59 INFO Utils: Fetching http://xx.xx.xx.xx:38287/jars/application.jar to /tmp/fetchFileTemp8223721356974787443.tmp
> 14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
> 14/11/27 10:27:01 INFO Executor: Adding file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class loader
> ...........
> 14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
> java.lang.ClassNotFoundException: com.xxx.Event
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
> 	at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> 	at java.security.AccessController.doPrivileged(Native Method)
> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> 	at java.lang.Class.forName0(Native Method)
> 	at java.lang.Class.forName(Class.java:344)
> 	at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
> 	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> 	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> 	at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
> 	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> 	at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
> 	at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
> 	at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
> 	at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
> 	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
> 	at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
> 	at org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
> 	at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
> 	at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
> 	at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> 	at org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
> 	at org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
> 	at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
> 	at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
> 	at org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
> 	at org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> We manually checked jar file and it contains com.xxx.Event class  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org