You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Tian Zhang (JIRA)" <ji...@apache.org> on 2015/07/08 01:40:04 UTC

[jira] [Commented] (SPARK-3990) kryo.KryoException caused by ALS.trainImplicit in pyspark

    [ https://issues.apache.org/jira/browse/SPARK-3990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14617663#comment-14617663 ] 

Tian Zhang commented on SPARK-3990:
-----------------------------------

Hi, I am using spark streaming 1.3.0 and I use scala. I am hitting a similar issue.
As long as I have List in my class definition, this happens here is the stack trace.
Serialization trace:
underlying (scala.collection.convert.Wrappers$JListWrapper)
bandwidth_kbps (com.oncue.rna.realtime.streaming.models.QosErrorEvent)
	at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
	at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
	at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
	at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:138)
	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
	at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
	at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
	at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
	at org.apache.spark.scheduler.Task.run(Task.scala:64)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
	at org.apache.avro.generic.GenericData$Array.add(GenericData.java:200)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
	at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
	... 42 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
	at scala.Option.foreach(Option.scala:236)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
15/07/07 22:42:06 INFO spark.SparkContext: Starting job: foreachRDD at QosErrorExtractorJob2.scala:595
15/07/07 22:42:06 WARN scheduler.TaskSetManager: Lost task 10.0 in stage 17.0 (TID 81, ip-10-118-10-71.ec2.internal): org.apache.spark.TaskKilledException
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

> kryo.KryoException caused by ALS.trainImplicit in pyspark
> ---------------------------------------------------------
>
>                 Key: SPARK-3990
>                 URL: https://issues.apache.org/jira/browse/SPARK-3990
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib, PySpark
>    Affects Versions: 1.1.0
>         Environment: 5 slaves cluster(m3.large) in AWS launched by spark-ec2
> Linux
> Python 2.6.8
>            Reporter: Gen TANG
>            Assignee: Xiangrui Meng
>            Priority: Critical
>              Labels: test
>             Fix For: 1.1.1, 1.2.0
>
>
> When we tried ALS.trainImplicit() in pyspark environment, it only works for iterations = 1. What is more strange, it is that if we try the same code in Scala, it works very well.(I did several test, by now, in Scala ALS.trainImplicit works)
> For example, the following code:
> {code:title=test.py|borderStyle=solid}
>   from pyspark.mllib.recommendation import *
>   r1 = (1, 1, 1.0) 
>   r2 = (1, 2, 2.0) 
>   r3 = (2, 1, 2.0) 
>   ratings = sc.parallelize([r1, r2, r3]) 
>   model = ALS.trainImplicit(ratings, 1) 
> '''by default iterations = 5 or model = ALS.trainImplicit(ratings, 1, 2)'''
> {code}
> It will cause the failed stage at count at ALS.scala:314 Info as:
> {code:title=error information provided by ganglia}
> Job aborted due to stage failure: Task 6 in stage 90.0 failed 4 times, most recent failure: Lost task 6.3 in stage 90.0 (TID 484, ip-172-31-35-238.ec2.internal): com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet
> Serialization trace:
> shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)
>         com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>         com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>         com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>         com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
>         com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
>         com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>         org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
>         org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>         org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>         org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
>         org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
>         org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
>         scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
> {code}
> In the log of slave which failed the task, it has:
> {code:title=error information in the log of slave}
> 14/10/17 13:20:54 ERROR executor.Executor: Exception in task 6.0 in stage 90.0 (TID 465)
> com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet
> Serialization trace:
> shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
> 	at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
> 	at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
> 	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
> 	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
> 	at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
> 	at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
> 	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> 	at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> 	at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> 	at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> 	at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:54)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayStoreException: scala.collection.mutable.HashSet
> 	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
> 	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
> 	... 36 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org