You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Soila Pertet Kavulya <sk...@gmail.com> on 2015/03/13 02:24:41 UTC

Re: NegativeArraySizeException when doing joins on skewed data

Hi Tristan,

Did upgrading to Kryo3 help?

Thanks,

Soila

On Sun, Mar 1, 2015 at 2:48 PM, Tristan Blakers <tr...@blackfrog.org> wrote:
> Yeah I implemented the same solution. It seems to kick in around the 4B
> mark, but looking at the log I suspect it’s probably a function of the
> number of unique objects more than anything. I definitely don’t have more
> than 2B unique objects.
>
>
> Will try the same test on Kryo3 and see if it goes away.
>
> T
>
>
> On 27 February 2015 at 06:21, Soila Pertet Kavulya <sk...@gmail.com>
> wrote:
>>
>> Thanks Tristan,
>>
>> I ran into a similar issue with broadcast variables. I worked around
>> it by estimating the size of the object I want to broadcast, splitting
>> it up into chunks that were less than 2G, then doing multiple
>> broadcasts. This approach worked pretty well for broadcast variables
>> less than 10GB on our system. However, for larger variables the spills
>> to disk made progress painfully slow so we need to do regular joins.
>>
>> Do you know if there are any efforts to get Kryo to support objects
>> larger than a couple of GBs.
>>
>> Soila
>>
>> On Wed, Feb 25, 2015 at 11:06 PM, Tristan Blakers <tr...@blackfrog.org>
>> wrote:
>> > I get the same exception simply by doing a large broadcast of about 6GB.
>> > Note that I’m broadcasting a small number (~3m) of fat objects. There’s
>> > plenty of free RAM. This and related kryo exceptions seem to crop-up
>> > whenever an object graph of more than a couple of GB gets passed around.
>> >
>> >         at
>> >
>> > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>> >
>> >         at
>> >
>> > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> >
>> >         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>> >
>> >         at
>> >
>> > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>> >
>> >         at
>> >
>> > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> >
>> >         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>> >
>> >         at
>> >
>> > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>> >
>> >         at
>> >
>> > com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> >
>> >         at
>> > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> >
>> >         at
>> >
>> > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:86)
>> >
>> >         at
>> >
>> > com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:17)
>> >
>> >         at
>> > com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> >
>> >         at
>> >
>> > org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
>> >
>> >         at
>> >
>> > org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:202)
>> >
>> >         at
>> >
>> > org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:101)
>> >
>> >         at
>> >
>> > org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:84)
>> >
>> >         at
>> >
>> > org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>> >
>> >         at
>> >
>> > org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29)
>> >
>> >         at
>> >
>> > org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>> >
>> >         at
>> > org.apache.spark.SparkContext.broadcast(SparkContext.scala:945)
>> >
>> >         at
>> >
>> > org.apache.spark.api.java.JavaSparkContext.broadcast(JavaSparkContext.scala:623)
>> >
>> >
>> > Caused by: java.lang.NegativeArraySizeException
>> >
>> >         at
>> >
>> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)
>> >
>> >         at
>> >
>> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)
>> >
>> >         at
>> >
>> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
>> >
>> >         at
>> >
>> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
>> >
>> >         at
>> >
>> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228)
>> >
>> >         at
>> >
>> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
>> >
>> >         at
>> >
>> > com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
>> >
>> >         at
>> >
>> > com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
>> >
>> >         at
>> > com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
>> >
>> >         at
>> > com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:539)
>> >
>> >         at
>> >
>> > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570)
>> >
>> >         ... 23 more
>> >
>> >
>> >
>> >
>> > On 26 February 2015 at 03:49, soila <sk...@gmail.com> wrote:
>> >>
>> >> I have been running into NegativeArraySizeException's when doing joins
>> >> on
>> >> data with very skewed key distributions in Spark 1.2.0. I found a
>> >> previous
>> >> post that mentioned that this exception arises when the size of the
>> >> blocks
>> >> spilled during the shuffle exceeds 2GB. The post recommended increasing
>> >> the
>> >> number of partitions. I tried increasing the number of partitions, and
>> >> using
>> >> the RangePartitioner instead of the HashPartitioner but still
>> >> encountered
>> >> the problem.
>> >>
>> >> Does Spark support skewed joins similar to Pig?
>> >>
>> >>
>> >> com.esotericsoftware.kryo.KryoException:
>> >> java.lang.NegativeArraySizeException
>> >> Serialization trace:
>> >> otherElements (org.apache.spark.util.collection.CompactBuffer)
>> >>         at
>> >>
>> >>
>> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585)
>> >>         at
>> >>
>> >>
>> >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> >>         at
>> >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> >>         at
>> >>
>> >>
>> >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>> >>         at
>> >>
>> >>
>> >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>> >>         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501)
>> >>         at
>> >>
>> >>
>> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>> >>         at
>> >>
>> >>
>> >> com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213)
>> >>         at
>> >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> >>         at
>> >>
>> >>
>> >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318)
>> >>         at
>> >>
>> >>
>> >> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293)
>> >>         at
>> >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> >>         at
>> >> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
>> >>         at
>> >> com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
>> >>         at
>> >> com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
>> >>         at
>> >>
>> >>
>> >> org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:128)
>> >>         at
>> >>
>> >>
>> >> org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
>> >>         at
>> >>
>> >>
>> >> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:176)
>> >>         at
>> >>
>> >>
>> >> org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:63)
>> >>         at
>> >>
>> >>
>> >> org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:87)
>> >>         at
>> >>
>> >>
>> >> org.apache.spark.util.collection.ExternalAppendOnlyMap.maybeSpill(ExternalAppendOnlyMap.scala:63)
>> >>         at
>> >>
>> >>
>> >> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:127)
>> >>         at
>> >>
>> >>
>> >> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
>> >>         at
>> >>
>> >>
>> >> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
>> >>         at
>> >>
>> >>
>> >> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>> >>         at
>> >>
>> >>
>> >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> >>         at
>> >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> >>         at
>> >>
>> >>
>> >> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>> >>         at
>> >> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
>> >>         at
>> >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> >>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> >>         at
>> >> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
>> >>         at
>> >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> >>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> >>         at
>> >>
>> >>
>> >> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
>> >>         at
>> >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> >>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> >>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>> >>         at
>> >> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>> >>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>> >>         at
>> >>
>> >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>> >>         at
>> >>
>> >> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>> >>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>> >>         at
>> >> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>> >>         at
>> >>
>> >>
>> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> >>         at
>> >>
>> >>
>> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> >>         at java.lang.Thread.run(Thread.java:745)
>> >> Caused by: java.lang.NegativeArraySizeException
>> >>         at
>> >>
>> >>
>> >> com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)
>> >>         at
>> >>
>> >>
>> >> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)
>> >>         at
>> >>
>> >>
>> >> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
>> >>         at
>> >>
>> >>
>> >> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
>> >>         at
>> >>
>> >>
>> >> com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228)
>> >>         at
>> >>
>> >>
>> >> com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
>> >>         at
>> >>
>> >>
>> >> com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
>> >>         at
>> >>
>> >>
>> >> com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
>> >>         at
>> >> com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
>> >>         at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:499)
>> >>         at
>> >>
>> >>
>> >> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564)
>> >>         ... 46 more
>> >>
>> >>
>> >>
>> >>
>> >> --
>> >> View this message in context:
>> >>
>> >> http://apache-spark-user-list.1001560.n3.nabble.com/NegativeArraySizeException-when-doing-joins-on-skewed-data-tp21802.html
>> >> Sent from the Apache Spark User List mailing list archive at
>> >> Nabble.com.
>> >>
>> >> ---------------------------------------------------------------------
>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >> For additional commands, e-mail: user-help@spark.apache.org
>> >>
>> >
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org