You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Andrew Ash (JIRA)" <ji...@apache.org> on 2014/11/14 10:08:35 UTC

[jira] [Commented] (SPARK-755) Kryo serialization failing - MLbase

    [ https://issues.apache.org/jira/browse/SPARK-755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14212033#comment-14212033 ] 

Andrew Ash commented on SPARK-755:
----------------------------------

Quick check [~sparks], this hasn't been updated in quite some time; is it still a problem for you?

I think you may have been bit by SPARK-2878 which also deals with Kryo serialization failing and has since been fixed.

> Kryo serialization failing - MLbase
> -----------------------------------
>
>                 Key: SPARK-755
>                 URL: https://issues.apache.org/jira/browse/SPARK-755
>             Project: Spark
>          Issue Type: Bug
>          Components: Block Manager, Spark Core
>    Affects Versions: 0.8.0
>            Reporter: Evan Sparks
>
> When I turn on Kryo serialization, I get the following error as I increase the size of my input dataset. (From ~10GB to ~100GB). This issue does not manifest itself when I turn kryo off.
> I have code that successfully reads files, parses them into an {noformat}RDD[(String,Vector)]{noformat}, which can then be .count()'ed. I then run a .flatMap on these, with a function that has the following signature:
> {code}
> def expandData(x: (String, Vector)): Seq[(String, Float, Vector)]
> {code}
> And running a .count() on that RDD crashes - stack trace of failed task looks like this:
> {noformat}
> 13/05/31 00:16:53 INFO cluster.TaskSetManager: Finished TID 2024 in 23594 ms (progress: 10/1000)
> 13/05/31 00:16:53 INFO scheduler.DAGScheduler: Completed ResultTask(3, 24)
> 13/05/31 00:16:53 INFO cluster.ClusterScheduler: parentName:,name:TaskSet_3,runningTasks:151
> 13/05/31 00:16:53 INFO cluster.TaskSetManager: Starting task 3.0:175 as TID 2161 on slave 14: ip-10-62-199-77.ec2.internal:40850 (NODE_LOCAL)
> 13/05/31 00:16:53 INFO cluster.TaskSetManager: Serialized task 3.0:175 as 2832 bytes in 0 ms
> 13/05/31 00:16:53 INFO cluster.TaskSetManager: Lost TID 2053 (task 3.0:49)
> 13/05/31 00:16:53 INFO cluster.TaskSetManager: Loss was due to com.esotericsoftware.kryo.KryoException
> com.esotericsoftware.kryo.KryoException: java.lang.ArrayIndexOutOfBoundsException
> Serialization trace:
> elements (org.mlbase.Vector)
> _3 (scala.Tuple3)
>         at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>         at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:504)
>         at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>         at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>         at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:571)
>         at spark.KryoSerializationStream.writeObject(KryoSerializer.scala:26)
>         at spark.serializer.SerializationStream$class.writeAll(Serializer.scala:63)
>         at spark.KryoSerializationStream.writeAll(KryoSerializer.scala:21)
>         at spark.storage.BlockManager.dataSerialize(BlockManager.scala:910)
>         at spark.storage.MemoryStore.putValues(MemoryStore.scala:61)
>         at spark.storage.BlockManager.liftedTree1$1(BlockManager.scala:584)
>         at spark.storage.BlockManager.put(BlockManager.scala:580)
>         at spark.CacheManager.getOrCompute(CacheManager.scala:55)
>         at spark.RDD.iterator(RDD.scala:207)
>         at spark.scheduler.ResultTask.run(ResultTask.scala:84)
>         at spark.executor.Executor$TaskRunner.run(Executor.scala:104)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:679)
> 13/05/31 00:16:53 INFO cluster.ClusterScheduler: parentName:,name:TaskSet_3,runningTasks:151
> 13/05/31 00:16:53 INFO cluster.TaskSetManager: Starting task 3.0:49 as TID 2162 on slave 12: ip-10-11-46-255.ec2.internal:38878 (NODE_LOCAL)
> 13/05/31 00:16:53 INFO cluster.TaskSetManager: Serialized task 3.0:49 as 2832 bytes in 0 ms
> 13/05/31 00:16:53 INFO cluster.ClusterScheduler: parentName:,name:TaskSet_3,runningTasks:152
> 13/05/31 00:16:54 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_7_257 in mem
> {noformat}
> My Kryo Registrator looks like this:
> {code}
> class MyRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
>     kryo.register(classOf[Vector])
>     kryo.register(classOf[String])
>     kryo.register(classOf[Float])
>     kryo.register(classOf[Tuple3[String,Float,Vector]])
>     kryo.register(classOf[Seq[Tuple3[String,Float,Vector]]])
>     kryo.register(classOf[Map[String,Vector]])
>   }
> }
> {code}
> "Vector" in this case is an org.mlbase.Vector, which in this case is a slightly modified version of spark.util.Vector (uses floats instead of Doubles).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org