You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Deming Zhu (JIRA)" <ji...@apache.org> on 2015/10/29 15:20:27 UTC

[jira] [Comment Edited] (SPARK-5569) Checkpoints cannot reference classes defined outside of Spark's assembly

    [ https://issues.apache.org/jira/browse/SPARK-5569?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14980493#comment-14980493 ] 

Deming Zhu edited comment on SPARK-5569 at 10/29/15 2:20 PM:
-------------------------------------------------------------

For OffsetRange ClassNotFound issue, I'm sure this patch could solve the issue because actually the document on
Spark Home Page could not run without this patch

Here's the sample code on Spark Home Page:

{code:title=kafkaSample}

// Hold a reference to the current offset ranges, so it can be used downstream
 var offsetRanges = Array[OffsetRange]()
	
 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.map {
           ...
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }
   ...
 }
{code}
we can see that offsetRanges is an array and need to be deserialized by executors.

For KafkaRDDPartition ClassNotFound issue, since I cannot see the source code,
I cannot 100% sure that this patch can solve the issue.

But, In my opinion, this kind of issue are all caused by Spark trying to load an array object. 

Can anyone of you try this patch can tell us whether your issues are solved or not?


was (Author: maxwell):
For OffsetRange ClassNotFound issue, I'm sure this patch could solve the issue because actually the document on
Spark Home Page could not run without this patch

Here's the sample code on Spark Home Page:
```
// Hold a reference to the current offset ranges, so it can be used downstream
 var offsetRanges = Array[OffsetRange]()
	
 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.map {
           ...
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }
   ...
 }
```
we can see that offsetRanges is an array and need to be deserialized by executors.

For KafkaRDDPartition ClassNotFound issue, since I cannot see the source code,
I cannot 100% sure that this patch can solve the issue.

But, In my opinion, this kind of issue are all caused by Spark trying to load an array object. 

Can anyone of you try this patch can tell us whether your issues are solved or not?

> Checkpoints cannot reference classes defined outside of Spark's assembly
> ------------------------------------------------------------------------
>
>                 Key: SPARK-5569
>                 URL: https://issues.apache.org/jira/browse/SPARK-5569
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>            Reporter: Patrick Wendell
>
> Not sure if this is a bug or a feature, but it's not obvious, so wanted to create a JIRA to make sure we document this behavior.
> First documented by Cody Koeninger:
> https://gist.github.com/koeninger/561a61482cd1b5b3600c
> {code}
> 15/01/12 16:07:07 INFO CheckpointReader: Attempting to load checkpoint from file file:/var/tmp/cp/checkpoint-1421100410000.bk
> 15/01/12 16:07:07 WARN CheckpointReader: Error reading checkpoint from file file:/var/tmp/cp/checkpoint-1421100410000.bk
> java.io.IOException: java.lang.ClassNotFoundException: org.apache.spark.rdd.kafka.KafkaRDDPartition
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1043)
>         at org.apache.spark.streaming.dstream.DStreamCheckpointData.readObject(DStreamCheckpointData.scala:146)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
>         at org.apache.spark.streaming.DStreamGraph$$anonfun$readObject$1.apply$mcV$sp(DStreamGraph.scala:180)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040)
>         at org.apache.spark.streaming.DStreamGraph.readObject(DStreamGraph.scala:176)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>         at org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:251)
>         at org.apache.spark.streaming.CheckpointReader$$anonfun$read$2.apply(Checkpoint.scala:239)
>         at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34)
>         at org.apache.spark.streaming.CheckpointReader$.read(Checkpoint.scala:239)
>         at org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:552)
>         at example.CheckpointedExample$.main(CheckpointedExample.scala:34)
>         at example.CheckpointedExample.main(CheckpointedExample.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:365)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.ClassNotFoundException: org.apache.spark.rdd.kafka.KafkaRDDPartition
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>         at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>         at java.lang.Class.forName0(Native Method)
>         at java.lang.Class.forName(Class.java:274)
>         at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
>         at org.apache.spark.streaming.ObjectInputStreamWithLoader.resolveClass(Checkpoint.scala:279)
>         at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>         at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>         at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1663)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>         at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
>         at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142)
>         at scala.collection.mutable.HashTable$class.init(HashTable.scala:105)
>         at scala.collection.mutable.HashMap.init(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.readObject(HashMap.scala:142)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>         at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>         at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>         at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
>         at org.apache.spark.streaming.dstream.DStreamCheckpointData$$anonfun$readObject$1.apply$mcV$sp(DStreamCheckpointData.scala:148)
>         at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1040)
>         ... 52 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org