You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Gerard Maas <ge...@gmail.com> on 2014/07/22 16:20:37 UTC

Using case classes as keys does not seem to work.

Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]

A minimal example:

case class P(name:String)
val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
[Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
(P(bob),1), (P(abe),1), (P(charly),1))

In contrast to the expected behavior, that should be equivalent to:
sc.parallelize(ps).map(x=> (x.name,1)).reduceByKey((x,y) => x+y).collect
Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))

Any ideas why this doesn't work?

-kr, Gerard.

Re: Using case classes as keys does not seem to work.

Posted by Gerard Maas <ge...@gmail.com>.
I created https://issues.apache.org/jira/browse/SPARK-2620 to track this.

Maybe useful to know, this is a regression on Spark 1.0.0. I tested the
same sample code on 0.9.1 and it worked (we have several jobs using case
classes as key aggregators, so it better does)

-kr, Gerard.


On Tue, Jul 22, 2014 at 5:37 PM, Gerard Maas <ge...@gmail.com> wrote:

> Yes, right. 'sc.parallelize(ps).map(x=> (**x.name**,1)).groupByKey().
> collect'
> An oversight from my side.
>
> Thanks!,  Gerard.
>
>
> On Tue, Jul 22, 2014 at 5:24 PM, Daniel Siegmann <daniel.siegmann@velos.io
> > wrote:
>
>> I can confirm this bug. The behavior for groupByKey is the same as
>> reduceByKey - your example is actually grouping on just the name. Try
>> this:
>>
>> sc.parallelize(ps).map(x=> (x,1)).groupByKey().collect
>> res1: Array[(P, Iterable[Int])] = Array((P(bob),ArrayBuffer(1)),
>> (P(bob),ArrayBuffer(1)), (P(alice),ArrayBuffer(1)),
>> (P(charly),ArrayBuffer(1)))
>>
>>
>> On Tue, Jul 22, 2014 at 10:30 AM, Gerard Maas <ge...@gmail.com>
>> wrote:
>>
>>> Just to narrow down the issue, it looks like the issue is in
>>> 'reduceByKey' and derivates like 'distinct'.
>>>
>>> groupByKey() seems to work
>>>
>>> sc.parallelize(ps).map(x=> (x.name,1)).groupByKey().collect
>>> res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)),
>>> (abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1)))
>>>
>>>
>>>
>>> On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas <ge...@gmail.com>
>>> wrote:
>>>
>>>> Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]
>>>>
>>>> A minimal example:
>>>>
>>>> case class P(name:String)
>>>> val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
>>>> sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
>>>> [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
>>>> (P(bob),1), (P(abe),1), (P(charly),1))
>>>>
>>>> In contrast to the expected behavior, that should be equivalent to:
>>>> sc.parallelize(ps).map(x=> (x.name,1)).reduceByKey((x,y) =>
>>>> x+y).collect
>>>> Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))
>>>>
>>>> Any ideas why this doesn't work?
>>>>
>>>> -kr, Gerard.
>>>>
>>>
>>>
>>
>>
>> --
>> Daniel Siegmann, Software Developer
>> Velos
>> Accelerating Machine Learning
>>
>> 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
>> E: daniel.siegmann@velos.io W: www.velos.io
>>
>
>

Re: Using case classes as keys does not seem to work.

Posted by Gerard Maas <ge...@gmail.com>.
I created https://issues.apache.org/jira/browse/SPARK-2620 to track this.

Maybe useful to know, this is a regression on Spark 1.0.0. I tested the
same sample code on 0.9.1 and it worked (we have several jobs using case
classes as key aggregators, so it better does)

-kr, Gerard.


On Tue, Jul 22, 2014 at 5:37 PM, Gerard Maas <ge...@gmail.com> wrote:

> Yes, right. 'sc.parallelize(ps).map(x=> (**x.name**,1)).groupByKey().
> collect'
> An oversight from my side.
>
> Thanks!,  Gerard.
>
>
> On Tue, Jul 22, 2014 at 5:24 PM, Daniel Siegmann <daniel.siegmann@velos.io
> > wrote:
>
>> I can confirm this bug. The behavior for groupByKey is the same as
>> reduceByKey - your example is actually grouping on just the name. Try
>> this:
>>
>> sc.parallelize(ps).map(x=> (x,1)).groupByKey().collect
>> res1: Array[(P, Iterable[Int])] = Array((P(bob),ArrayBuffer(1)),
>> (P(bob),ArrayBuffer(1)), (P(alice),ArrayBuffer(1)),
>> (P(charly),ArrayBuffer(1)))
>>
>>
>> On Tue, Jul 22, 2014 at 10:30 AM, Gerard Maas <ge...@gmail.com>
>> wrote:
>>
>>> Just to narrow down the issue, it looks like the issue is in
>>> 'reduceByKey' and derivates like 'distinct'.
>>>
>>> groupByKey() seems to work
>>>
>>> sc.parallelize(ps).map(x=> (x.name,1)).groupByKey().collect
>>> res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)),
>>> (abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1)))
>>>
>>>
>>>
>>> On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas <ge...@gmail.com>
>>> wrote:
>>>
>>>> Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]
>>>>
>>>> A minimal example:
>>>>
>>>> case class P(name:String)
>>>> val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
>>>> sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
>>>> [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
>>>> (P(bob),1), (P(abe),1), (P(charly),1))
>>>>
>>>> In contrast to the expected behavior, that should be equivalent to:
>>>> sc.parallelize(ps).map(x=> (x.name,1)).reduceByKey((x,y) =>
>>>> x+y).collect
>>>> Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))
>>>>
>>>> Any ideas why this doesn't work?
>>>>
>>>> -kr, Gerard.
>>>>
>>>
>>>
>>
>>
>> --
>> Daniel Siegmann, Software Developer
>> Velos
>> Accelerating Machine Learning
>>
>> 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
>> E: daniel.siegmann@velos.io W: www.velos.io
>>
>
>

Re: Using case classes as keys does not seem to work.

Posted by Gerard Maas <ge...@gmail.com>.
Yes, right. 'sc.parallelize(ps).map(x=> (**x.name**,1)).groupByKey().collect
'
An oversight from my side.

Thanks!,  Gerard.


On Tue, Jul 22, 2014 at 5:24 PM, Daniel Siegmann <da...@velos.io>
wrote:

> I can confirm this bug. The behavior for groupByKey is the same as
> reduceByKey - your example is actually grouping on just the name. Try
> this:
>
> sc.parallelize(ps).map(x=> (x,1)).groupByKey().collect
> res1: Array[(P, Iterable[Int])] = Array((P(bob),ArrayBuffer(1)),
> (P(bob),ArrayBuffer(1)), (P(alice),ArrayBuffer(1)),
> (P(charly),ArrayBuffer(1)))
>
>
> On Tue, Jul 22, 2014 at 10:30 AM, Gerard Maas <ge...@gmail.com>
> wrote:
>
>> Just to narrow down the issue, it looks like the issue is in
>> 'reduceByKey' and derivates like 'distinct'.
>>
>> groupByKey() seems to work
>>
>> sc.parallelize(ps).map(x=> (x.name,1)).groupByKey().collect
>> res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)),
>> (abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1)))
>>
>>
>>
>> On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas <ge...@gmail.com>
>> wrote:
>>
>>> Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]
>>>
>>> A minimal example:
>>>
>>> case class P(name:String)
>>> val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
>>> sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
>>> [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
>>> (P(bob),1), (P(abe),1), (P(charly),1))
>>>
>>> In contrast to the expected behavior, that should be equivalent to:
>>> sc.parallelize(ps).map(x=> (x.name,1)).reduceByKey((x,y) => x+y).collect
>>> Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))
>>>
>>> Any ideas why this doesn't work?
>>>
>>> -kr, Gerard.
>>>
>>
>>
>
>
> --
> Daniel Siegmann, Software Developer
> Velos
> Accelerating Machine Learning
>
> 440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
> E: daniel.siegmann@velos.io W: www.velos.io
>

Re: Using case classes as keys does not seem to work.

Posted by Daniel Siegmann <da...@velos.io>.
I can confirm this bug. The behavior for groupByKey is the same as
reduceByKey - your example is actually grouping on just the name. Try this:

sc.parallelize(ps).map(x=> (x,1)).groupByKey().collect
res1: Array[(P, Iterable[Int])] = Array((P(bob),ArrayBuffer(1)),
(P(bob),ArrayBuffer(1)), (P(alice),ArrayBuffer(1)),
(P(charly),ArrayBuffer(1)))


On Tue, Jul 22, 2014 at 10:30 AM, Gerard Maas <ge...@gmail.com> wrote:

> Just to narrow down the issue, it looks like the issue is in 'reduceByKey'
> and derivates like 'distinct'.
>
> groupByKey() seems to work
>
> sc.parallelize(ps).map(x=> (x.name,1)).groupByKey().collect
> res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)),
> (abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1)))
>
>
>
> On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas <ge...@gmail.com>
> wrote:
>
>> Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]
>>
>> A minimal example:
>>
>> case class P(name:String)
>> val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
>> sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
>> [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
>> (P(bob),1), (P(abe),1), (P(charly),1))
>>
>> In contrast to the expected behavior, that should be equivalent to:
>> sc.parallelize(ps).map(x=> (x.name,1)).reduceByKey((x,y) => x+y).collect
>> Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))
>>
>> Any ideas why this doesn't work?
>>
>> -kr, Gerard.
>>
>
>


-- 
Daniel Siegmann, Software Developer
Velos
Accelerating Machine Learning

440 NINTH AVENUE, 11TH FLOOR, NEW YORK, NY 10001
E: daniel.siegmann@velos.io W: www.velos.io

Re: Using case classes as keys does not seem to work.

Posted by Gerard Maas <ge...@gmail.com>.
Just to narrow down the issue, it looks like the issue is in 'reduceByKey'
and derivates like 'distinct'.

groupByKey() seems to work

sc.parallelize(ps).map(x=> (x.name,1)).groupByKey().collect
res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)),
(abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1)))



On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas <ge...@gmail.com> wrote:

> Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]
>
> A minimal example:
>
> case class P(name:String)
> val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
> sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
> [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
> (P(bob),1), (P(abe),1), (P(charly),1))
>
> In contrast to the expected behavior, that should be equivalent to:
> sc.parallelize(ps).map(x=> (x.name,1)).reduceByKey((x,y) => x+y).collect
> Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))
>
> Any ideas why this doesn't work?
>
> -kr, Gerard.
>

Re: Using case classes as keys does not seem to work.

Posted by Gerard Maas <ge...@gmail.com>.
Just to narrow down the issue, it looks like the issue is in 'reduceByKey'
and derivates like 'distinct'.

groupByKey() seems to work

sc.parallelize(ps).map(x=> (x.name,1)).groupByKey().collect
res: Array[(String, Iterable[Int])] = Array((charly,ArrayBuffer(1)),
(abe,ArrayBuffer(1)), (bob,ArrayBuffer(1, 1)))



On Tue, Jul 22, 2014 at 4:20 PM, Gerard Maas <ge...@gmail.com> wrote:

> Using a case class as a key doesn't seem to work properly. [Spark 1.0.0]
>
> A minimal example:
>
> case class P(name:String)
> val ps = Array(P("alice"), P("bob"), P("charly"), P("bob"))
> sc.parallelize(ps).map(x=> (x,1)).reduceByKey((x,y) => x+y).collect
> [Spark shell local mode] res : Array[(P, Int)] = Array((P(bob),1),
> (P(bob),1), (P(abe),1), (P(charly),1))
>
> In contrast to the expected behavior, that should be equivalent to:
> sc.parallelize(ps).map(x=> (x.name,1)).reduceByKey((x,y) => x+y).collect
> Array[(String, Int)] = Array((charly,1), (abe,1), (bob,2))
>
> Any ideas why this doesn't work?
>
> -kr, Gerard.
>