You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by ab...@thomsonreuters.com on 2014/09/17 17:37:57 UTC

GroupBy Key and then sort values with the group

Hi Group,

I am quite fresh in the spark world. There is a particular use case that I just cannot understand how to accomplish in spark. I am using Cloudera CDH5/YARN/Java 7.

I have a dataset that has the following characteristics -

A JavaPairRDD that represents the following -

Key => {int ID}
Value => {date effectiveFrom, float value}

Let's say that the data I have is the following -


Partition - 1
[K=> 1, V=> {09-17-2014, 2.8}]
[K=> 1, V=> {09-11-2014, 3.9}]
[K=> 3, V=> {09-18-2014, 5.0}]
[K=> 3, V=> {09-10-2014, 7.4}]


Partition - 2
[K=> 2, V=> {09-13-2014, 2.5}]
[K=> 4, V=> {09-07-2014, 6.2}]
[K=> 2, V=> {09-12-2014, 1.8}]
[K=> 4, V=> {09-22-2014, 2.9}]


Grouping by key gives me the following RDD

Partition - 1
[K=> 1, V=> Iterable({09-17-2014, 2.8}, {09-11-2014, 3.9})]
[K=> 3, V=> Iterable({09-18-2014, 5.0}, {09-10-2014, 7.4})]

Partition - 2
[K=> 2, Iterable({09-13-2014, 2.5}, {09-12-2014, 1.8})]
[K=> 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})]

Now I would like to sort by the values and the result should look like this -

Partition - 1
[K=> 1, V=> Iterable({09-11-2014, 3.9}, {09-17-2014, 2.8})]
[K=> 3, V=> Iterable({09-10-2014, 7.4}, {09-18-2014, 5.0})]

Partition - 2
[K=> 2, Iterable({09-12-2014, 1.8}, {09-13-2014, 2.5})]
[K=> 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})]


What is the best way to do this in spark? If so desired, I can even move the "effectiveFrom" (the field that I want to sort on) into the key field.

A code snippet or some pointers on how to solve this would be very helpful.

Regards,
Abraham

Re: GroupBy Key and then sort values with the group

Posted by Chinchu Sup <ch...@gmail.com>.
Thanks Davies.. I'll try it when it gets released (I am on 1.1.0
currently). For now I am using a custom partitioner with the ShuffleRDD()
to keep the same groups together, so I don't have to shuffle all data to a
single partition.

On Thu, Oct 9, 2014 at 2:34 PM, Davies Liu <da...@databricks.com> wrote:

> There is a new API called repartitionAndSortWithinPartitions() in
> master, it may help in this case,
> then you should do the `groupBy()` by yourself.
>
> On Wed, Oct 8, 2014 at 4:03 PM, chinchu <ch...@gmail.com> wrote:
> > Sean,
> >
> > I am having a similar issue, but I have a lot of data for a group & I
> cannot
> > materialize the iterable into a List or Seq in memory. [I tried & it runs
> > into OOM]. is there any other way to do this ?
> >
> > I also tried a secondary-sort, with the key having the "group::time", but
> > the problem with that is the same group-name ends up in multiple
> partitions
> > & I am having to run sortByKey with one partition - sortByKey(true, 1)
> which
> > shuffles a lot of data..
> >
> > Thanks,
> > -C
> >
> >
> >
> > --
> > View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/GroupBy-Key-and-then-sort-values-with-the-group-tp14455p15990.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> > For additional commands, e-mail: user-help@spark.apache.org
> >
>

Re: GroupBy Key and then sort values with the group

Posted by Davies Liu <da...@databricks.com>.
There is a new API called repartitionAndSortWithinPartitions() in
master, it may help in this case,
then you should do the `groupBy()` by yourself.

On Wed, Oct 8, 2014 at 4:03 PM, chinchu <ch...@gmail.com> wrote:
> Sean,
>
> I am having a similar issue, but I have a lot of data for a group & I cannot
> materialize the iterable into a List or Seq in memory. [I tried & it runs
> into OOM]. is there any other way to do this ?
>
> I also tried a secondary-sort, with the key having the "group::time", but
> the problem with that is the same group-name ends up in multiple partitions
> & I am having to run sortByKey with one partition - sortByKey(true, 1) which
> shuffles a lot of data..
>
> Thanks,
> -C
>
>
>
> --
> View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GroupBy-Key-and-then-sort-values-with-the-group-tp14455p15990.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: GroupBy Key and then sort values with the group

Posted by chinchu <ch...@gmail.com>.
Sean,

I am having a similar issue, but I have a lot of data for a group & I cannot
materialize the iterable into a List or Seq in memory. [I tried & it runs
into OOM]. is there any other way to do this ?

I also tried a secondary-sort, with the key having the "group::time", but
the problem with that is the same group-name ends up in multiple partitions
& I am having to run sortByKey with one partition - sortByKey(true, 1) which
shuffles a lot of data..

Thanks,
-C



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GroupBy-Key-and-then-sort-values-with-the-group-tp14455p15990.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


RE: GroupBy Key and then sort values with the group

Posted by ab...@thomsonreuters.com.
Thanks Sean,

Makes total sense. I guess I was so caught up with RDD's and all the wonderful transformations it can do, that I did not think about pain old Java Collections.sort(list, comparator).

Thanks,

______________________

Abraham


-----Original Message-----
From: Sean Owen [mailto:sowen@cloudera.com] 
Sent: Wednesday, September 17, 2014 9:37 AM
To: Jacob, Abraham (Financial&Risk)
Cc: user@spark.apache.org
Subject: Re: GroupBy Key and then sort values with the group

You just need to call mapValues() to change your Iterable of things into a sorted Iterable of things for each key-value pair. In that function you write, it's no different from any other Java program. I imagine you'll need to copy the input Iterable into an ArrayList (unfortunately), sort it with whatever Comparator you want, and return the result.

On Wed, Sep 17, 2014 at 4:37 PM,  <ab...@thomsonreuters.com> wrote:
> Hi Group,
>
>
>
> I am quite fresh in the spark world. There is a particular use case 
> that I just cannot understand how to accomplish in spark. I am using 
> Cloudera CDH5/YARN/Java 7.
>
>
>
> I have a dataset that has the following characteristics –
>
>
>
> A JavaPairRDD that represents the following –
>
>
>
> Key => {int ID}
>
> Value => {date effectiveFrom, float value}
>
>
>
> Let’s say that the data I have is the following –
>
>
>
>
>
> Partition – 1
>
> [K=> 1, V=> {09-17-2014, 2.8}]
>
> [K=> 1, V=> {09-11-2014, 3.9}]
>
> [K=> 3, V=> {09-18-2014, 5.0}]
>
> [K=> 3, V=> {09-10-2014, 7.4}]
>
>
>
>
>
> Partition – 2
>
> [K=> 2, V=> {09-13-2014, 2.5}]
>
> [K=> 4, V=> {09-07-2014, 6.2}]
>
> [K=> 2, V=> {09-12-2014, 1.8}]
>
> [K=> 4, V=> {09-22-2014, 2.9}]
>
>
>
>
>
> Grouping by key gives me the following RDD
>
>
>
> Partition – 1
>
> [K=> 1, V=> Iterable({09-17-2014, 2.8}, {09-11-2014, 3.9})]
>
> [K=> 3, V=> Iterable({09-18-2014, 5.0}, {09-10-2014, 7.4})]
>
>
>
> Partition – 2
>
> [K=> 2, Iterable({09-13-2014, 2.5}, {09-12-2014, 1.8})]
>
> [K=> 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})]
>
>
>
> Now I would like to sort by the values and the result should look like 
> this –
>
>
>
> Partition – 1
>
> [K=> 1, V=> Iterable({09-11-2014, 3.9}, {09-17-2014, 2.8})]
>
> [K=> 3, V=> Iterable({09-10-2014, 7.4}, {09-18-2014, 5.0})]
>
>
>
> Partition – 2
>
> [K=> 2, Iterable({09-12-2014, 1.8}, {09-13-2014, 2.5})]
>
> [K=> 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})]
>
>
>
>
>
> What is the best way to do this in spark? If so desired, I can even 
> move the “effectiveFrom” (the field that I want to sort on) into the key field.
>
>
>
> A code snippet or some pointers on how to solve this would be very helpful.
>
>
>
> Regards,
>
> Abraham

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: GroupBy Key and then sort values with the group

Posted by Sean Owen <so...@cloudera.com>.
You just need to call mapValues() to change your Iterable of things
into a sorted Iterable of things for each key-value pair. In that
function you write, it's no different from any other Java program. I
imagine you'll need to copy the input Iterable into an ArrayList
(unfortunately), sort it with whatever Comparator you want, and return
the result.

On Wed, Sep 17, 2014 at 4:37 PM,  <ab...@thomsonreuters.com> wrote:
> Hi Group,
>
>
>
> I am quite fresh in the spark world. There is a particular use case that I
> just cannot understand how to accomplish in spark. I am using Cloudera
> CDH5/YARN/Java 7.
>
>
>
> I have a dataset that has the following characteristics –
>
>
>
> A JavaPairRDD that represents the following –
>
>
>
> Key => {int ID}
>
> Value => {date effectiveFrom, float value}
>
>
>
> Let’s say that the data I have is the following –
>
>
>
>
>
> Partition – 1
>
> [K=> 1, V=> {09-17-2014, 2.8}]
>
> [K=> 1, V=> {09-11-2014, 3.9}]
>
> [K=> 3, V=> {09-18-2014, 5.0}]
>
> [K=> 3, V=> {09-10-2014, 7.4}]
>
>
>
>
>
> Partition – 2
>
> [K=> 2, V=> {09-13-2014, 2.5}]
>
> [K=> 4, V=> {09-07-2014, 6.2}]
>
> [K=> 2, V=> {09-12-2014, 1.8}]
>
> [K=> 4, V=> {09-22-2014, 2.9}]
>
>
>
>
>
> Grouping by key gives me the following RDD
>
>
>
> Partition – 1
>
> [K=> 1, V=> Iterable({09-17-2014, 2.8}, {09-11-2014, 3.9})]
>
> [K=> 3, V=> Iterable({09-18-2014, 5.0}, {09-10-2014, 7.4})]
>
>
>
> Partition – 2
>
> [K=> 2, Iterable({09-13-2014, 2.5}, {09-12-2014, 1.8})]
>
> [K=> 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})]
>
>
>
> Now I would like to sort by the values and the result should look like this
> –
>
>
>
> Partition – 1
>
> [K=> 1, V=> Iterable({09-11-2014, 3.9}, {09-17-2014, 2.8})]
>
> [K=> 3, V=> Iterable({09-10-2014, 7.4}, {09-18-2014, 5.0})]
>
>
>
> Partition – 2
>
> [K=> 2, Iterable({09-12-2014, 1.8}, {09-13-2014, 2.5})]
>
> [K=> 4, Iterable({09-07-2014, 6.2}, {09-22-2014, 2.9})]
>
>
>
>
>
> What is the best way to do this in spark? If so desired, I can even move the
> “effectiveFrom” (the field that I want to sort on) into the key field.
>
>
>
> A code snippet or some pointers on how to solve this would be very helpful.
>
>
>
> Regards,
>
> Abraham

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org