You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by donhoff_h <16...@qq.com> on 2015/03/25 08:44:29 UTC
Serialization Problem in Spark Program
Hi, experts
I wrote a very simple spark program to test the KryoSerialization function. The codes are as following:
object TestKryoSerialization {
def main(args: Array[String]) {
val conf = new SparkConf()
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired","true") //I use this statement to force checking registration.
conf.registerKryoClasses(Array(classOf[MyObject]))
val sc = new SparkContext(conf)
val rdd = sc.textFile("hdfs://dhao.hrb:8020/user/spark/tstfiles/charset/A_utf8.txt")
val objs = rdd.map(new MyObject(_,1)).collect()
for (x <- objs ) {
x.printMyObject
}
}
The class MyObject is also a very simple Class, which is only used to test the serialization function:
class MyObject {
var myStr : String = ""
var myInt : Int = 0
def this(inStr : String, inInt : Int) {
this()
this.myStr = inStr
this.myInt = inInt
}
def printMyObject {
println("MyString is : "+myStr+"\tMyInt is : "+myInt)
}
}
But when I ran the application, it reported the following error:
java.lang.IllegalArgumentException: Class is not registered: dhao.test.Serialization.MyObject[]
Note: To register this class use: kryo.register(dhao.test.Serialization.MyObject[].class);
at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565)
at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:161)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
I don't understand what cause this problem. I have used the "conf.registerKryoClasses" to register my class. Could anyone help me ? Thanks
By the way, the spark version is 1.3.0.
Re: Serialization Problem in Spark Program
Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Awesome.
Thanks
Best Regards
On Fri, Mar 27, 2015 at 7:26 AM, donhoff_h <16...@qq.com> wrote:
> Hi, Akhil
>
> Yes, it's the problem lies in. Thanks very much for point out my mistake.
>
> ------------------ Original ------------------
> *From: * "Akhil Das";<ak...@sigmoidanalytics.com>;
> *Send time:* Thursday, Mar 26, 2015 3:23 PM
> *To:* "donhoff_h"<16...@qq.com>;
> *Cc:* "user"<us...@spark.apache.org>;
> *Subject: * Re: Serialization Problem in Spark Program
>
> Try registering your MyObject[] with Kryo.
> On 25 Mar 2015 13:17, "donhoff_h" <16...@qq.com> wrote:
>
>> Hi, experts
>>
>> I wrote a very simple spark program to test the KryoSerialization
>> function. The codes are as following:
>>
>> object TestKryoSerialization {
>> def main(args: Array[String]) {
>> val conf = new SparkConf()
>> conf.set("spark.serializer",
>> "org.apache.spark.serializer.KryoSerializer")
>> conf.set("spark.kryo.registrationRequired","true") //I use this
>> statement to force checking registration.
>> conf.registerKryoClasses(Array(classOf[MyObject]))
>>
>> val sc = new SparkContext(conf)
>> val rdd =
>> sc.textFile("hdfs://dhao.hrb:8020/user/spark/tstfiles/charset/A_utf8.txt")
>> val objs = rdd.map(new MyObject(_,1)).collect()
>> for (x <- objs ) {
>> x.printMyObject
>> }
>> }
>>
>> The class MyObject is also a very simple Class, which is only used to
>> test the serialization function:
>> class MyObject {
>> var myStr : String = ""
>> var myInt : Int = 0
>> def this(inStr : String, inInt : Int) {
>> this()
>> this.myStr = inStr
>> this.myInt = inInt
>> }
>> def printMyObject {
>> println("MyString is : "+myStr+"\tMyInt is : "+myInt)
>> }
>> }
>>
>> But when I ran the application, it reported the following error:
>> java.lang.IllegalArgumentException: Class is not registered:
>> dhao.test.Serialization.MyObject[]
>> Note: To register this class use:
>> kryo.register(dhao.test.Serialization.MyObject[].class);
>> at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
>> at
>> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
>> at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
>> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565)
>> at
>> org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:161)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> I don't understand what cause this problem. I have used the
>> "conf.registerKryoClasses" to register my class. Could anyone help me ?
>> Thanks
>>
>> By the way, the spark version is 1.3.0.
>>
>>
Re: Serialization Problem in Spark Program
Posted by Akhil Das <ak...@sigmoidanalytics.com>.
Try registering your MyObject[] with Kryo.
On 25 Mar 2015 13:17, "donhoff_h" <16...@qq.com> wrote:
> Hi, experts
>
> I wrote a very simple spark program to test the KryoSerialization
> function. The codes are as following:
>
> object TestKryoSerialization {
> def main(args: Array[String]) {
> val conf = new SparkConf()
> conf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> conf.set("spark.kryo.registrationRequired","true") //I use this
> statement to force checking registration.
> conf.registerKryoClasses(Array(classOf[MyObject]))
>
> val sc = new SparkContext(conf)
> val rdd =
> sc.textFile("hdfs://dhao.hrb:8020/user/spark/tstfiles/charset/A_utf8.txt")
> val objs = rdd.map(new MyObject(_,1)).collect()
> for (x <- objs ) {
> x.printMyObject
> }
> }
>
> The class MyObject is also a very simple Class, which is only used to test
> the serialization function:
> class MyObject {
> var myStr : String = ""
> var myInt : Int = 0
> def this(inStr : String, inInt : Int) {
> this()
> this.myStr = inStr
> this.myInt = inInt
> }
> def printMyObject {
> println("MyString is : "+myStr+"\tMyInt is : "+myInt)
> }
> }
>
> But when I ran the application, it reported the following error:
> java.lang.IllegalArgumentException: Class is not registered:
> dhao.test.Serialization.MyObject[]
> Note: To register this class use:
> kryo.register(dhao.test.Serialization.MyObject[].class);
> at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
> at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565)
> at
> org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:161)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> I don't understand what cause this problem. I have used the
> "conf.registerKryoClasses" to register my class. Could anyone help me ?
> Thanks
>
> By the way, the spark version is 1.3.0.
>
>
Re: Serialization Problem in Spark Program
Posted by Imran Rashid <ir...@cloudera.com>.
you also need to register *array*s of MyObject. so change:
conf.registerKryoClasses(Array(classOf[MyObject]))
to
conf.registerKryoClasses(Array(classOf[MyObject], classOf[Array[MyObject]]))
On Wed, Mar 25, 2015 at 2:44 AM, donhoff_h <16...@qq.com> wrote:
> Hi, experts
>
> I wrote a very simple spark program to test the KryoSerialization
> function. The codes are as following:
>
> object TestKryoSerialization {
> def main(args: Array[String]) {
> val conf = new SparkConf()
> conf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
> conf.set("spark.kryo.registrationRequired","true") //I use this
> statement to force checking registration.
> conf.registerKryoClasses(Array(classOf[MyObject]))
>
> val sc = new SparkContext(conf)
> val rdd =
> sc.textFile("hdfs://dhao.hrb:8020/user/spark/tstfiles/charset/A_utf8.txt")
> val objs = rdd.map(new MyObject(_,1)).collect()
> for (x <- objs ) {
> x.printMyObject
> }
> }
>
> The class MyObject is also a very simple Class, which is only used to test
> the serialization function:
> class MyObject {
> var myStr : String = ""
> var myInt : Int = 0
> def this(inStr : String, inInt : Int) {
> this()
> this.myStr = inStr
> this.myInt = inInt
> }
> def printMyObject {
> println("MyString is : "+myStr+"\tMyInt is : "+myInt)
> }
> }
>
> But when I ran the application, it reported the following error:
> java.lang.IllegalArgumentException: Class is not registered:
> dhao.test.Serialization.MyObject[]
> Note: To register this class use:
> kryo.register(dhao.test.Serialization.MyObject[].class);
> at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
> at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
> at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565)
> at
> org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:161)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
>
> I don't understand what cause this problem. I have used the
> "conf.registerKryoClasses" to register my class. Could anyone help me ?
> Thanks
>
> By the way, the spark version is 1.3.0.
>
>