You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Konstantin Abakumov <ru...@gmail.com> on 2013/11/28 15:49:02 UTC

Spark streaming + kafka 0.7.2 error

Hello!


I am facing a problem using spark streaming and KafkaStream as input stream.


Spark version: 0.8.0-incubating, kafka-0.7.2 that ships with spark distribution. Spark is deployed on Amazon EC2 with 2 slaves using provided scripts.


My application gets an error in distributed mode (stack trace is pasted below). As I understand, workers cannot find class kafka.consumer.ConsumerConnector during job execution. As a result, no data is consumed from kafka broker. It's strange, because kafka jar is in classpath and packaged with application, and no problem occured in local mode, data were consumed and processed correctly.


Then I've tried to execute KafkaWordCount example but got the same problem, again in distributed mode only. Moreover, there was no kafka jar added to spark-examples assembly jar or classpath, so I had to add it manually to classpath in run-example script, otherwise example was failed to execute :)


Unfortunately, I haven't figured out what led to this error yet, so any advice will be helpful.


After all I've tested the spark's master branch that uses kafka 0.8.0 and, fortunately, no problem occured. KafkaWordCount example works correctly both in local and distributed modes. So the problem appears only with kafka 0.7.2.


We use kafka 0.7 in our project now, but we're going to upgrade it to version 0.8 after first spark's stable release with 0.8.0 support. Is master branch any sort of production-ready? Can we use it now or it is not recommended at the moment?


Stack trace:
java.lang.NoClassDefFoundError: Lkafka/consumer/ConsumerConnector; at java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.privateGetDeclaredFields(Class.java:2397) at java.lang.Class.getDeclaredField(Class.java:1946) at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659) at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:468) at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.Ob
jectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStre
amClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:135) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecu
tor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.ClassNotFoundException: kafka.consumer.ConsumerConnector at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        ... 43 more 


-- 
Best regards,
Konstantin Abakumov


Re: Spark streaming + kafka 0.7.2 error

Posted by Konstantin Abakumov <ru...@gmail.com>.
Hello again,

In spark-developers mailing list I saw discussion about v0.8.1-incubating version and decided to try it out. And my problem сan not be reproduced in it!  

There were some commits made regarding streaming and kafka, for example: https://github.com/apache/incubator-spark/commit/c85665157afa75caeac3a91adf97a0edc0cac3a5#diff-abb0256a318439745ab0b343d64b9ba0  
I don’t understand how they relate to my problem, but it fixed)

Maybe this information can be helpful.  

—  
Best regards,
Konstantin Abakumov


On Thursday, 28 November 2013 г. at 18:49, Konstantin Abakumov wrote:

> Hello!
>  
>  
> I am facing a problem using spark streaming and KafkaStream as input stream.
>  
>  
> Spark version: 0.8.0-incubating, kafka-0.7.2 that ships with spark distribution. Spark is deployed on Amazon EC2 with 2 slaves using provided scripts.
>  
>  
> My application gets an error in distributed mode (stack trace is pasted below). As I understand, workers cannot find class kafka.consumer.ConsumerConnector during job execution. As a result, no data is consumed from kafka broker. It's strange, because kafka jar is in classpath and packaged with application, and no problem occured in local mode, data were consumed and processed correctly.
>  
>  
> Then I've tried to execute KafkaWordCount example but got the same problem, again in distributed mode only. Moreover, there was no kafka jar added to spark-examples assembly jar or classpath, so I had to add it manually to classpath in run-example script, otherwise example was failed to execute :)
>  
>  
> Unfortunately, I haven't figured out what led to this error yet, so any advice will be helpful.
>  
>  
> After all I've tested the spark's master branch that uses kafka 0.8.0 and, fortunately, no problem occured. KafkaWordCount example works correctly both in local and distributed modes. So the problem appears only with kafka 0.7.2.
>  
>  
> We use kafka 0.7 in our project now, but we're going to upgrade it to version 0.8 after first spark's stable release with 0.8.0 support. Is master branch any sort of production-ready? Can we use it now or it is not recommended at the moment?
>  
>  
> Stack trace:
> java.lang.NoClassDefFoundError: Lkafka/consumer/ConsumerConnector; at java.lang.Class.getDeclaredFields0(Native Method) at java.lang.Class.privateGetDeclaredFields(Class.java:2397) at java.lang.Class.getDeclaredField(Class.java:1946) at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659) at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480) at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468) at java.security.AccessController.doPrivileged(Native Method) at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:468) at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365) at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:602) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:70) at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:135) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.ClassNotFoundException: kafka.consumer.ConsumerConnector at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>         ... 43 more  
>  
>  
> --  
> Best regards,
> Konstantin Abakumov
>