You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mykhaylo Telizhyn (JIRA)" <ji...@apache.org> on 2014/12/11 14:33:13 UTC
[jira] [Updated] (SPARK-4830) Spark Streaming Java Application :
java.lang.ClassNotFoundException
[ https://issues.apache.org/jira/browse/SPARK-4830?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mykhaylo Telizhyn updated SPARK-4830:
-------------------------------------
Summary: Spark Streaming Java Application : java.lang.ClassNotFoundException (was: Spark Java Application : java.lang.ClassNotFoundException)
> Spark Streaming Java Application : java.lang.ClassNotFoundException
> -------------------------------------------------------------------
>
> Key: SPARK-4830
> URL: https://issues.apache.org/jira/browse/SPARK-4830
> Project: Spark
> Issue Type: Bug
> Components: Streaming
> Affects Versions: 1.1.0
> Reporter: Mykhaylo Telizhyn
>
> h4. Application Overview:
>
> We have Spark Streaming application that consumes messages from RabbitMQ and processes them. When generating hundreds of events on RabbitMQ and running our application on spark standalone cluster we see some {{java.lang.ClassNotFoundException}} exceptions in the log.
> Our domain model is simple POJO that represents RabbitMQ events we want to consume and contains some custom properties we are interested in:
> {code:title=com.xxx.Event.java|borderStyle=solid}
> public class Event implements java.io.Externalizable {
>
> // custom properties
> // custom implementation of writeExternal(), readExternal() methods
> }
> {code}
> We have implemented custom Spark Streaming receiver that just receives messages from RabbitMQ queue by means of custom consumer (See _"Receiving messages by subscription"_ at https://www.rabbitmq.com/api-guide.html), converts them to our custom domain event objects ({{com.xxx.Event}}) and stores them on spark memory:
> {code:title=RabbitMQReceiver.java|borderStyle=solid}
> byte[] body = // data received from Rabbit using custom consumer
> Event event = new Event(body);
> store(event) // store into Spark
> {code}
> The main program is simple, it just set up spark streaming context:
> {code:title=Application.java|borderStyle=solid}
> SparkConf sparkConf = new SparkConf().setAppName(APPLICATION_NAME);
> sparkConf.setJars(SparkContext.jarOfClass(Application.class).toList());
> JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(BATCH_DURATION_MS));
> {code}
> Initialize input streams:
> {code:title=Application.java|borderStyle=solid}
> ReceiverInputDStream<Event> stream = // create input stream from RabbitMQ
> JavaReceiverInputDStream<Event> events = new JavaReceiverInputDStream<Event>(stream, classTag(Event.class));
> {code}
> Process events:
> {code:title=Application.java|borderStyle=solid}
> events.foreachRDD(
> rdd -> {
> rdd.foreachPartition(
> partition -> {
>
> // process partition
> }
> }
> })
>
> ssc.start();
> ssc.awaitTermination();
> {code}
> h4. Application submission:
>
> Application is packaged as a single fat jar file using maven shade plugin (http://maven.apache.org/plugins/maven-shade-plugin/). It is compiled with spark version _1.1.0_
> We run our application on spark version _1.1.0_ standalone cluster that consists of driver host, master host and two worker hosts. We submit application from driver host.
>
> On one of the workers we see {{java.lang.ClassNotFoundException}} exceptions:
> {panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#f0f8ff}
> 14/11/27 10:27:10 ERROR BlockManagerWorker: Exception handling buffer message
> java.lang.ClassNotFoundException: com.xxx.Event
> at java.net.URLClassLoader$1.run(URLClassLoader.java:372)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:361)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:360)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:344)
> at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:59)
> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
> at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:235)
> at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:126)
> at org.apache.spark.storage.MemoryStore.putIterator(MemoryStore.scala:104)
> at org.apache.spark.storage.MemoryStore.putBytes(MemoryStore.scala:76)
> at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:748)
> at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:639)
> at org.apache.spark.storage.BlockManagerWorker.putBlock(BlockManagerWorker.scala:92)
> at org.apache.spark.storage.BlockManagerWorker.processBlockMessage(BlockManagerWorker.scala:73)
> at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
> at org.apache.spark.storage.BlockManagerWorker$$anonfun$2.apply(BlockManagerWorker.scala:48)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at org.apache.spark.storage.BlockMessageArray.foreach(BlockMessageArray.scala:28)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at org.apache.spark.storage.BlockMessageArray.map(BlockMessageArray.scala:28)
> at org.apache.spark.storage.BlockManagerWorker.onBlockMessageReceive(BlockManagerWorker.scala:48)
> at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
> at org.apache.spark.storage.BlockManagerWorker$$anonfun$1.apply(BlockManagerWorker.scala:38)
> at org.apache.spark.network.ConnectionManager.org$apache$spark$network$ConnectionManager$$handleMessage(ConnectionManager.scala:682)
> at org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:520)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> {panel}
> We see that worker has downloaded application.jar and added it to class loader:
> {panel:title=app.log|borderStyle=dashed|borderColor=#ccc|titleBGColor=#e3e4e1|bgColor=#f0f8ff}
> 14/11/27 10:26:59 INFO Executor: Fetching http://xx.xx.xx.xx:38287/jars/application.jar with timestamp 1417084011213
> 14/11/27 10:26:59 INFO Utils: Fetching http://xx.xx.xx.xx:38287/jars/application.jar to /tmp/fetchFileTemp8223721356974787443.tmp
> 14/11/27 10:27:00 INFO BlockManager: Removing RDD 4
> 14/11/27 10:27:01 INFO Executor: Adding file:/path/to/spark/work/app-20141127102651-0001/1/./application.jar to class loader
> {panel}
> Also we manually checked jar file and it contains {{com.xxx.Event}} class
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org