You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mykhaylo Telizhyn (JIRA)" <ji...@apache.org> on 2014/12/11 14:33:13 UTC

[jira] [Updated] (SPARK-4830) Spark Streaming Java Application : java.lang.ClassNotFoundException

     [ https://issues.apache.org/jira/browse/SPARK-4830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Mykhaylo Telizhyn updated SPARK-4830:
-------------------------------------
    Summary: Spark Streaming Java Application : java.lang.ClassNotFoundException  (was: Spark Java Application : java.lang.ClassNotFoundException)

> Spark Streaming Java Application : java.lang.ClassNotFoundException
> -------------------------------------------------------------------
>
>                 Key: SPARK-4830
>                 URL: https://issues.apache.org/jira/browse/SPARK-4830
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.1.0
>            Reporter: Mykhaylo Telizhyn
>
> h4. Application Overview:
>   
>        We have Spark Streaming application that consumes messages from RabbitMQ and processes them. When generating hundreds of events on RabbitMQ and running our application on spark standalone cluster we see some {{java.lang.ClassNotFoundException}} exceptions in the log. 
> Our domain model is simple POJO that represents RabbitMQ events we want to consume and contains some custom properties we are interested in: 
> {code:title=com.xxx.Event.java|borderStyle=solid}
>         public class Event implements java.io.Externalizable {
>     
>             // custom properties
>             // custom implementation of writeExternal(), readExternal() methods
>         }    
> {code}
>         We have implemented custom Spark Streaming receiver that just receives messages from RabbitMQ queue by means of custom consumer (See _"Receiving messages by subscription"_ at https://www.rabbitmq.com/api-guide.html), converts them to our custom domain event objects ({{com.xxx.Event}}) and stores them on spark memory:
> {code:title=RabbitMQReceiver.java|borderStyle=solid}
>             byte[] body = // data received from Rabbit using custom consumer
>             Event event = new Event(body);
>             store(event)  // store into Spark              
> {code}                
>         The main program is simple, it just set up spark streaming context:
> {code:title=Application.java|borderStyle=solid}
>             SparkConf sparkConf = new SparkConf().setAppName(APPLICATION_NAME);
>             sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());  
>             JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(BATCH_DURATION_MS));
> {code}
>         Initialize input streams:
> {code:title=Application.java|borderStyle=solid}            
>             ReceiverInputDStream<Event> stream = // create input stream from RabbitMQ
>             JavaReceiverInputDStream<Event> events = new JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
> {code}            
>         Process events:
> {code:title=Application.java|borderStyle=solid}            
>             events.foreachRDD(
>                     rdd -> {
>                         rdd.foreachPartition(
>                                 partition -> {
>  
>                                         // process partition
>                                 }
>                         }
>                     })
>                     
>             ssc.start();
>             ssc.awaitTermination();
> {code}
> h4. Application submission:            
>             
>         Application is packaged as a single fat jar file using maven shade plugin (http://maven.apache.org/plugins/maven-shade-plugin/). It is compiled with spark version _1.1.0_           
>         We run our application on spark version _1.1.0_ standalone cluster that consists of driver host, master host and two worker hosts. We submit application from driver host.
>         
>         On one of the workers we see {{java.lang.ClassNotFoundException}} exceptions:       
> {panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#f0f8ff}
> 14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
> java.lang.ClassNotFoundException: com.xxx.Event
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
>     at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>     at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>     at java.lang.Class.forName0(Native Method)
>     at java.lang.Class.forName(Class.java:344)
>     at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
>     at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>     at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>     at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>     at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
>     at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>     at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>     at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
>     at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
>     at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
>     at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
>     at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
>     at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
>     at org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
>     at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
>     at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
>     at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>     at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>     at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>     at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>     at org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
>     at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>     at org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
>     at org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
>     at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
>     at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
>     at org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
>     at org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> {panel}
>     We see that worker has downloaded application.jar and added it to class loader:
> {panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#f0f8ff}
> 14/11/27 10:26:59 INFO Executor: Fetching http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
> 14/11/27 10:26:59 INFO Utils: Fetching http://xx.xx.xx.xx:38287/jars/application.jar to /tmp/fetchFileTemp8223721356974787443.tmp
> 14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
> 14/11/27 10:27:01 INFO Executor: Adding file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class loader
> {panel}
>   Also we manually checked jar file and it contains {{com.xxx.Event}} class  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org