You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Andrew Ash <an...@andrewash.com> on 2014/02/19 05:07:51 UTC

Bug in spark.shuffle.spill setting? (0.9.0)

Hi dev list,

I'm running into an issue where I'm seeing different results from Spark
when I run with spark.shuffle.spill=false vs leaving it at the default
(true).

It's on internal data so I can't share my exact repro, but here's roughly
what I'm doing:

val rdd = sc.textFile(...)
  .map(l => ... (col1, col2))  // parse CSV into Tuple2[String,String]
  .distinct
  .join(
    sc.textFile(...)
       .map(l => ... (col1, col2))  // parse CSV into Tuple2[String,String]
       .distinct
  )
  .map{ case (k,(v1,v2)) => Seq(v1,k,v2).mkString("|") }

Then I output:
(rdd.count, rdd.distinct.count)

When I run with spark.shuffle.spill=false I get this:
(3192729,3192729)

And with spark.shuffle.spill=true I get this:
(3192931,3192726)

Has anyone else seen any bugs in join-heavy operations while using
spark.shuffle.spill=true?

My current theory is that I have a hashcode collision between rows (unusual
I know) and that the AppendOnlyMap does equality based on
hashcode()+equals() and ExternalAppendOnlyMap does equality based just on
hashcode().

Would appreciate some additional eyes on this problem for sure.

Right now I'm looking through the source and tests for AppendOnlyMap and
ExternalAppendOnlyMap to see if anything jumps out at me.

Thanks!
Andrew

Re: Bug in spark.shuffle.spill setting? (0.9.0)

Posted by Andrew Or <an...@gmail.com>.
For compressing shuffle spills in 0.9, we added a hack such that it always
uses LZF, so actually your compression library shouldn't matter. We did
notice that Kryo was pre-fetching, however, such that batching reads led to
some items being lost. To fix this, we introduced a hack specifically for
Kryo that works around this. Although we tested it and the hack sufficed
back then, it is entirely possible that there are corner cases that we
missed. In any case, PR #533 (after 0.9 release) should have taken care of
the problem.

If you still run into the same problem on master, then it could be a corner
case that our current way of handling hash collisions missed. When you have
the time, do let us know what you find!

Andrew

2014-02-18 21:08 GMT-08:00 Andrew Ash <an...@andrewash.com>:

> I'm using Kryo with these options:
>
> -Dspark.shuffle.spill=false -Dspark.storage.memoryFraction=0.4
> -Dspark.serializer=org.apache.spark.serializer.KryoSerializer
> -Dspark.kryo.registrator=com.andrewash.CustomKryoRegistrator
>
> The data is being read from a .lzo file and written back to another .lzo
> file if that affects things.  Does that cover the compression and
> serialization libraries question?
>
> I can give master a shot with my repro but it may be some time now that I
> have a workaround.  I'm trying to turn something around quickly and have my
> own bugs to debug as well :)
>
> Thanks!
> Andrew
>
>
> On Tue, Feb 18, 2014 at 9:02 PM, Andrew Or <an...@gmail.com> wrote:
>
> > Looks like you have a large number of distinct keys. As you suspect, this
> > maybe due to hash collisions, which only up to 4 billion. It could be
> > related to this PR: https://github.com/apache/incubator-spark/pull/612.
> >
> > The other thing is we had some issues with the behavior of arbitrary
> > serialization/compression engines, and this is solved in the PR that
> Mridul
> > referenced. What compression and serialization libraries are you using?
> >
> >
> > 2014-02-18 20:56 GMT-08:00 Mridul Muralidharan <mr...@gmail.com>:
> >
> > > I had not resolved it in time for 0.9 - but IIRC there was a recent PR
> > > which fixed bugs in spill [1] : are you able to reproduce this with
> > > spark master ?
> > >
> > > Regards,
> > > Mridul
> > >
> > > [1] https://github.com/apache/incubator-spark/pull/533
> > >
> > > On Wed, Feb 19, 2014 at 9:58 AM, Andrew Ash <an...@andrewash.com>
> > wrote:
> > > > I confirmed also that the spill to disk _was_ occurring:
> > > >
> > > > 14/02/18 22:50:50 WARN collection.ExternalAppendOnlyMap: Spilling
> > > in-memory
> > > > map of 634 MB to disk (1 time so far)
> > > > 14/02/18 22:50:50 WARN collection.ExternalAppendOnlyMap: Spilling
> > > in-memory
> > > > map of 581 MB to disk (1 time so far)
> > > >
> > > >
> > > > On Tue, Feb 18, 2014 at 8:07 PM, Andrew Ash <an...@andrewash.com>
> > > wrote:
> > > >
> > > >> Hi dev list,
> > > >>
> > > >> I'm running into an issue where I'm seeing different results from
> > Spark
> > > >> when I run with spark.shuffle.spill=false vs leaving it at the
> default
> > > >> (true).
> > > >>
> > > >> It's on internal data so I can't share my exact repro, but here's
> > > roughly
> > > >> what I'm doing:
> > > >>
> > > >> val rdd = sc.textFile(...)
> > > >>   .map(l => ... (col1, col2))  // parse CSV into
> Tuple2[String,String]
> > > >>   .distinct
> > > >>   .join(
> > > >>     sc.textFile(...)
> > > >>        .map(l => ... (col1, col2))  // parse CSV into
> > > Tuple2[String,String]
> > > >>        .distinct
> > > >>   )
> > > >>   .map{ case (k,(v1,v2)) => Seq(v1,k,v2).mkString("|") }
> > > >>
> > > >> Then I output:
> > > >> (rdd.count, rdd.distinct.count)
> > > >>
> > > >> When I run with spark.shuffle.spill=false I get this:
> > > >> (3192729,3192729)
> > > >>
> > > >> And with spark.shuffle.spill=true I get this:
> > > >> (3192931,3192726)
> > > >>
> > > >> Has anyone else seen any bugs in join-heavy operations while using
> > > >> spark.shuffle.spill=true?
> > > >>
> > > >> My current theory is that I have a hashcode collision between rows
> > > >> (unusual I know) and that the AppendOnlyMap does equality based on
> > > >> hashcode()+equals() and ExternalAppendOnlyMap does equality based
> just
> > > on
> > > >> hashcode().
> > > >>
> > > >> Would appreciate some additional eyes on this problem for sure.
> > > >>
> > > >> Right now I'm looking through the source and tests for AppendOnlyMap
> > and
> > > >> ExternalAppendOnlyMap to see if anything jumps out at me.
> > > >>
> > > >> Thanks!
> > > >> Andrew
> > > >>
> > >
> >
>

Re: Bug in spark.shuffle.spill setting? (0.9.0)

Posted by Andrew Ash <an...@andrewash.com>.
I'm using Kryo with these options:

-Dspark.shuffle.spill=false -Dspark.storage.memoryFraction=0.4
-Dspark.serializer=org.apache.spark.serializer.KryoSerializer
-Dspark.kryo.registrator=com.andrewash.CustomKryoRegistrator

The data is being read from a .lzo file and written back to another .lzo
file if that affects things.  Does that cover the compression and
serialization libraries question?

I can give master a shot with my repro but it may be some time now that I
have a workaround.  I'm trying to turn something around quickly and have my
own bugs to debug as well :)

Thanks!
Andrew


On Tue, Feb 18, 2014 at 9:02 PM, Andrew Or <an...@gmail.com> wrote:

> Looks like you have a large number of distinct keys. As you suspect, this
> maybe due to hash collisions, which only up to 4 billion. It could be
> related to this PR: https://github.com/apache/incubator-spark/pull/612.
>
> The other thing is we had some issues with the behavior of arbitrary
> serialization/compression engines, and this is solved in the PR that Mridul
> referenced. What compression and serialization libraries are you using?
>
>
> 2014-02-18 20:56 GMT-08:00 Mridul Muralidharan <mr...@gmail.com>:
>
> > I had not resolved it in time for 0.9 - but IIRC there was a recent PR
> > which fixed bugs in spill [1] : are you able to reproduce this with
> > spark master ?
> >
> > Regards,
> > Mridul
> >
> > [1] https://github.com/apache/incubator-spark/pull/533
> >
> > On Wed, Feb 19, 2014 at 9:58 AM, Andrew Ash <an...@andrewash.com>
> wrote:
> > > I confirmed also that the spill to disk _was_ occurring:
> > >
> > > 14/02/18 22:50:50 WARN collection.ExternalAppendOnlyMap: Spilling
> > in-memory
> > > map of 634 MB to disk (1 time so far)
> > > 14/02/18 22:50:50 WARN collection.ExternalAppendOnlyMap: Spilling
> > in-memory
> > > map of 581 MB to disk (1 time so far)
> > >
> > >
> > > On Tue, Feb 18, 2014 at 8:07 PM, Andrew Ash <an...@andrewash.com>
> > wrote:
> > >
> > >> Hi dev list,
> > >>
> > >> I'm running into an issue where I'm seeing different results from
> Spark
> > >> when I run with spark.shuffle.spill=false vs leaving it at the default
> > >> (true).
> > >>
> > >> It's on internal data so I can't share my exact repro, but here's
> > roughly
> > >> what I'm doing:
> > >>
> > >> val rdd = sc.textFile(...)
> > >>   .map(l => ... (col1, col2))  // parse CSV into Tuple2[String,String]
> > >>   .distinct
> > >>   .join(
> > >>     sc.textFile(...)
> > >>        .map(l => ... (col1, col2))  // parse CSV into
> > Tuple2[String,String]
> > >>        .distinct
> > >>   )
> > >>   .map{ case (k,(v1,v2)) => Seq(v1,k,v2).mkString("|") }
> > >>
> > >> Then I output:
> > >> (rdd.count, rdd.distinct.count)
> > >>
> > >> When I run with spark.shuffle.spill=false I get this:
> > >> (3192729,3192729)
> > >>
> > >> And with spark.shuffle.spill=true I get this:
> > >> (3192931,3192726)
> > >>
> > >> Has anyone else seen any bugs in join-heavy operations while using
> > >> spark.shuffle.spill=true?
> > >>
> > >> My current theory is that I have a hashcode collision between rows
> > >> (unusual I know) and that the AppendOnlyMap does equality based on
> > >> hashcode()+equals() and ExternalAppendOnlyMap does equality based just
> > on
> > >> hashcode().
> > >>
> > >> Would appreciate some additional eyes on this problem for sure.
> > >>
> > >> Right now I'm looking through the source and tests for AppendOnlyMap
> and
> > >> ExternalAppendOnlyMap to see if anything jumps out at me.
> > >>
> > >> Thanks!
> > >> Andrew
> > >>
> >
>

Re: Bug in spark.shuffle.spill setting? (0.9.0)

Posted by Andrew Or <an...@gmail.com>.
Looks like you have a large number of distinct keys. As you suspect, this
maybe due to hash collisions, which only up to 4 billion. It could be
related to this PR: https://github.com/apache/incubator-spark/pull/612.

The other thing is we had some issues with the behavior of arbitrary
serialization/compression engines, and this is solved in the PR that Mridul
referenced. What compression and serialization libraries are you using?


2014-02-18 20:56 GMT-08:00 Mridul Muralidharan <mr...@gmail.com>:

> I had not resolved it in time for 0.9 - but IIRC there was a recent PR
> which fixed bugs in spill [1] : are you able to reproduce this with
> spark master ?
>
> Regards,
> Mridul
>
> [1] https://github.com/apache/incubator-spark/pull/533
>
> On Wed, Feb 19, 2014 at 9:58 AM, Andrew Ash <an...@andrewash.com> wrote:
> > I confirmed also that the spill to disk _was_ occurring:
> >
> > 14/02/18 22:50:50 WARN collection.ExternalAppendOnlyMap: Spilling
> in-memory
> > map of 634 MB to disk (1 time so far)
> > 14/02/18 22:50:50 WARN collection.ExternalAppendOnlyMap: Spilling
> in-memory
> > map of 581 MB to disk (1 time so far)
> >
> >
> > On Tue, Feb 18, 2014 at 8:07 PM, Andrew Ash <an...@andrewash.com>
> wrote:
> >
> >> Hi dev list,
> >>
> >> I'm running into an issue where I'm seeing different results from Spark
> >> when I run with spark.shuffle.spill=false vs leaving it at the default
> >> (true).
> >>
> >> It's on internal data so I can't share my exact repro, but here's
> roughly
> >> what I'm doing:
> >>
> >> val rdd = sc.textFile(...)
> >>   .map(l => ... (col1, col2))  // parse CSV into Tuple2[String,String]
> >>   .distinct
> >>   .join(
> >>     sc.textFile(...)
> >>        .map(l => ... (col1, col2))  // parse CSV into
> Tuple2[String,String]
> >>        .distinct
> >>   )
> >>   .map{ case (k,(v1,v2)) => Seq(v1,k,v2).mkString("|") }
> >>
> >> Then I output:
> >> (rdd.count, rdd.distinct.count)
> >>
> >> When I run with spark.shuffle.spill=false I get this:
> >> (3192729,3192729)
> >>
> >> And with spark.shuffle.spill=true I get this:
> >> (3192931,3192726)
> >>
> >> Has anyone else seen any bugs in join-heavy operations while using
> >> spark.shuffle.spill=true?
> >>
> >> My current theory is that I have a hashcode collision between rows
> >> (unusual I know) and that the AppendOnlyMap does equality based on
> >> hashcode()+equals() and ExternalAppendOnlyMap does equality based just
> on
> >> hashcode().
> >>
> >> Would appreciate some additional eyes on this problem for sure.
> >>
> >> Right now I'm looking through the source and tests for AppendOnlyMap and
> >> ExternalAppendOnlyMap to see if anything jumps out at me.
> >>
> >> Thanks!
> >> Andrew
> >>
>

Re: Bug in spark.shuffle.spill setting? (0.9.0)

Posted by Mridul Muralidharan <mr...@gmail.com>.
I had not resolved it in time for 0.9 - but IIRC there was a recent PR
which fixed bugs in spill [1] : are you able to reproduce this with
spark master ?

Regards,
Mridul

[1] https://github.com/apache/incubator-spark/pull/533

On Wed, Feb 19, 2014 at 9:58 AM, Andrew Ash <an...@andrewash.com> wrote:
> I confirmed also that the spill to disk _was_ occurring:
>
> 14/02/18 22:50:50 WARN collection.ExternalAppendOnlyMap: Spilling in-memory
> map of 634 MB to disk (1 time so far)
> 14/02/18 22:50:50 WARN collection.ExternalAppendOnlyMap: Spilling in-memory
> map of 581 MB to disk (1 time so far)
>
>
> On Tue, Feb 18, 2014 at 8:07 PM, Andrew Ash <an...@andrewash.com> wrote:
>
>> Hi dev list,
>>
>> I'm running into an issue where I'm seeing different results from Spark
>> when I run with spark.shuffle.spill=false vs leaving it at the default
>> (true).
>>
>> It's on internal data so I can't share my exact repro, but here's roughly
>> what I'm doing:
>>
>> val rdd = sc.textFile(...)
>>   .map(l => ... (col1, col2))  // parse CSV into Tuple2[String,String]
>>   .distinct
>>   .join(
>>     sc.textFile(...)
>>        .map(l => ... (col1, col2))  // parse CSV into Tuple2[String,String]
>>        .distinct
>>   )
>>   .map{ case (k,(v1,v2)) => Seq(v1,k,v2).mkString("|") }
>>
>> Then I output:
>> (rdd.count, rdd.distinct.count)
>>
>> When I run with spark.shuffle.spill=false I get this:
>> (3192729,3192729)
>>
>> And with spark.shuffle.spill=true I get this:
>> (3192931,3192726)
>>
>> Has anyone else seen any bugs in join-heavy operations while using
>> spark.shuffle.spill=true?
>>
>> My current theory is that I have a hashcode collision between rows
>> (unusual I know) and that the AppendOnlyMap does equality based on
>> hashcode()+equals() and ExternalAppendOnlyMap does equality based just on
>> hashcode().
>>
>> Would appreciate some additional eyes on this problem for sure.
>>
>> Right now I'm looking through the source and tests for AppendOnlyMap and
>> ExternalAppendOnlyMap to see if anything jumps out at me.
>>
>> Thanks!
>> Andrew
>>

Re: Bug in spark.shuffle.spill setting? (0.9.0)

Posted by Andrew Ash <an...@andrewash.com>.
I confirmed also that the spill to disk _was_ occurring:

14/02/18 22:50:50 WARN collection.ExternalAppendOnlyMap: Spilling in-memory
map of 634 MB to disk (1 time so far)
14/02/18 22:50:50 WARN collection.ExternalAppendOnlyMap: Spilling in-memory
map of 581 MB to disk (1 time so far)


On Tue, Feb 18, 2014 at 8:07 PM, Andrew Ash <an...@andrewash.com> wrote:

> Hi dev list,
>
> I'm running into an issue where I'm seeing different results from Spark
> when I run with spark.shuffle.spill=false vs leaving it at the default
> (true).
>
> It's on internal data so I can't share my exact repro, but here's roughly
> what I'm doing:
>
> val rdd = sc.textFile(...)
>   .map(l => ... (col1, col2))  // parse CSV into Tuple2[String,String]
>   .distinct
>   .join(
>     sc.textFile(...)
>        .map(l => ... (col1, col2))  // parse CSV into Tuple2[String,String]
>        .distinct
>   )
>   .map{ case (k,(v1,v2)) => Seq(v1,k,v2).mkString("|") }
>
> Then I output:
> (rdd.count, rdd.distinct.count)
>
> When I run with spark.shuffle.spill=false I get this:
> (3192729,3192729)
>
> And with spark.shuffle.spill=true I get this:
> (3192931,3192726)
>
> Has anyone else seen any bugs in join-heavy operations while using
> spark.shuffle.spill=true?
>
> My current theory is that I have a hashcode collision between rows
> (unusual I know) and that the AppendOnlyMap does equality based on
> hashcode()+equals() and ExternalAppendOnlyMap does equality based just on
> hashcode().
>
> Would appreciate some additional eyes on this problem for sure.
>
> Right now I'm looking through the source and tests for AppendOnlyMap and
> ExternalAppendOnlyMap to see if anything jumps out at me.
>
> Thanks!
> Andrew
>