You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Robin Cjc <cj...@gmail.com> on 2013/10/31 08:30:53 UTC

Failed to Register spark.kryo.registrator and EOFException

Hi,

I am trying to use Spark 0.8.0 over Cassandra with CQL.

Initially, it works fine when it run on local mode with 'master'='local[4]'
in SparkContext.

Then I try to move it onto the standalone cluster. All the nodes are
installed Ubuntu 12.04 with :
Oracle JDK 1.6.0_35
Scala 2.9.3
Cassandra 1.2.10

After I run one the cluster, at first I get the "unread data block"
exception. The suggestions from group-google is to use the Kryo serializer
to replace the original java serializer. Then I encounter the current
problem. It will throw a exception and print out the "failed to register
spark.kryo.registrator" in KryoSerializer and a EOFException just as below.

13/10/28 12:12:36 INFO cluster.ClusterTaskSetManager: Lost TID 0 (task
0.0:0)
13/10/28 12:12:36 INFO cluster.ClusterTaskSetManager: Loss was due to
java.io.EOFException
java.io.EOFException
    at org.apache.spark.serializer.KryoDeserializationStream.
readObject(KryoSerializer.scala:109)
    at org.apache.spark.broadcast.HttpBroadcast$.read(
HttpBroadcast.scala:150)
    at org.apache.spark.broadcast.HttpBroadcast.readObject(
HttpBroadcast.scala:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
    at java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
    at java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
    at java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.io.ObjectInputStream.readObject(Unknown Source)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:435)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
    at java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
    at java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.io.ObjectInputStream.readObject(Unknown Source)
    at org.apache.spark.serializer.JavaDeserializationStream.
readObject(JavaSerializer.scala:39)
    at org.apache.spark.scheduler.ResultTask$.deserializeInfo(
ResultTask.scala:61)
    at org.apache.spark.scheduler.ResultTask.readExternal(
ResultTask.scala:129)
    at java.io.ObjectInputStream.readExternalData(Unknown Source)
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.io.ObjectInputStream.readObject(Unknown Source)
    at org.apache.spark.serializer.JavaDeserializationStream.
readObject(JavaSerializer.scala:39)
    at org.apache.spark.serializer.JavaSerializerInstance.
deserialize(JavaSerializer.scala:61)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown
Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

I register my registrator and property as below:

        System.setProperty("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
        System.setProperty("spark.kryo.registrator",
classOf[MyRegistrator].getName)
        val sc = new SparkContext("spark://10.138.39.226:7077", "SC
Cluster",
            "/home/robin/workspace/spark/spark-0.8.0-incubating",

scala.collection.immutable.List("target/scala-2.9.3/simple-project_2.9.3-1.0.jar"))

class MyRegistrator extends KryoRegistrator {
    override def registerClasses(kryo: Kryo) {
        kryo.register(classOf[org.apache.spark.rdd.RDD[(Map[String,
ByteBuffer], Map[String, ByteBuffer])]])
        kryo.register(classOf[String], 1)
        kryo.register(classOf[Map[String, ByteBuffer]], 2)
    }
}

And the version of Kryo what i use is 2.21.

This problem has bothered me for a long time, can you give me some hints
where i went wrong.

Thank you.
Chen Jingci