You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Madhu <ma...@madhu.com> on 2014/05/20 15:25:33 UTC

Sorting partitions in Java

I'm trying to sort data in each partition of an RDD.
I was able to do it successfully in Scala like this:

val sorted = rdd.mapPartitions(iter => {
  iter.toArray.sortWith((x, y) => x._2.compare(y._2) < 0).iterator
},
preservesPartitioning = true)

I used the same technique as in OrderedRDDFunctions.scala, so I assume it's
a reasonable way to do it.

This works well so far, but I can't seem to do the same thing in Java
because 'iter' in the Java APIs is an Iterator rather than an Iterable.
There may be an unattractive workaround, but I didn't pursue it.

Ideally, it would be nice to have an efficient, robust method in RDD to sort
each partition.
Does something like that exist?

Thanks!



-----
--
Madhu
https://www.linkedin.com/in/msiddalingaiah
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Sorting-partitions-in-Java-tp6715.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re: Sorting partitions in Java

Posted by Andrew Ash <an...@andrewash.com>.
Voted :)

https://issues.apache.org/jira/browse/SPARK-983


On Tue, May 20, 2014 at 10:21 AM, Sandy Ryza <sa...@cloudera.com>wrote:

> There is: SPARK-545
>
>
> On Tue, May 20, 2014 at 10:16 AM, Andrew Ash <an...@andrewash.com> wrote:
>
> > Sandy, is there a Jira ticket for that?
> >
> >
> > On Tue, May 20, 2014 at 10:12 AM, Sandy Ryza <sandy.ryza@cloudera.com
> > >wrote:
> >
> > > sortByKey currently requires partitions to fit in memory, but there are
> > > plans to add external sort
> > >
> > >
> > > On Tue, May 20, 2014 at 10:10 AM, Madhu <ma...@madhu.com> wrote:
> > >
> > > > Thanks Sean, I had seen that post you mentioned.
> > > >
> > > > What you suggest looks an in-memory sort, which is fine if each
> > partition
> > > > is
> > > > small enough to fit in memory. Is it true that rdd.sortByKey(...)
> > > requires
> > > > partitions to fit in memory? I wasn't sure if there was some magic
> > behind
> > > > the scenes that supports arbitrarily large sorts.
> > > >
> > > > None of this is a show stopper, it just might require a little more
> > code
> > > on
> > > > the part of the developer. If there's a requirement for Spark
> > partitions
> > > to
> > > > fit in memory, developers will have to be aware of that and plan
> > > > accordingly. One nice feature of Hadoop MR is the ability to sort
> very
> > > > large
> > > > sets without thinking about data size.
> > > >
> > > > In the case that a developer repartitions an RDD such that some
> > > partitions
> > > > don't fit in memory, sorting those partitions requires more work. For
> > > these
> > > > cases, I think there is value in having a robust partition sorting
> > method
> > > > that deals with it efficiently and reliably.
> > > >
> > > > Is there another solution for sorting arbitrarily large partitions?
> If
> > > not,
> > > > I don't mind developing and contributing a solution.
> > > >
> > > >
> > > >
> > > >
> > > > -----
> > > > --
> > > > Madhu
> > > > https://www.linkedin.com/in/msiddalingaiah
> > > > --
> > > > View this message in context:
> > > >
> > >
> >
> http://apache-spark-developers-list.1001551.n3.nabble.com/Sorting-partitions-in-Java-tp6715p6719.html
> > > > Sent from the Apache Spark Developers List mailing list archive at
> > > > Nabble.com.
> > > >
> > >
> >
>

Re: Sorting partitions in Java

Posted by Sandy Ryza <sa...@cloudera.com>.
There is: SPARK-545


On Tue, May 20, 2014 at 10:16 AM, Andrew Ash <an...@andrewash.com> wrote:

> Sandy, is there a Jira ticket for that?
>
>
> On Tue, May 20, 2014 at 10:12 AM, Sandy Ryza <sandy.ryza@cloudera.com
> >wrote:
>
> > sortByKey currently requires partitions to fit in memory, but there are
> > plans to add external sort
> >
> >
> > On Tue, May 20, 2014 at 10:10 AM, Madhu <ma...@madhu.com> wrote:
> >
> > > Thanks Sean, I had seen that post you mentioned.
> > >
> > > What you suggest looks an in-memory sort, which is fine if each
> partition
> > > is
> > > small enough to fit in memory. Is it true that rdd.sortByKey(...)
> > requires
> > > partitions to fit in memory? I wasn't sure if there was some magic
> behind
> > > the scenes that supports arbitrarily large sorts.
> > >
> > > None of this is a show stopper, it just might require a little more
> code
> > on
> > > the part of the developer. If there's a requirement for Spark
> partitions
> > to
> > > fit in memory, developers will have to be aware of that and plan
> > > accordingly. One nice feature of Hadoop MR is the ability to sort very
> > > large
> > > sets without thinking about data size.
> > >
> > > In the case that a developer repartitions an RDD such that some
> > partitions
> > > don't fit in memory, sorting those partitions requires more work. For
> > these
> > > cases, I think there is value in having a robust partition sorting
> method
> > > that deals with it efficiently and reliably.
> > >
> > > Is there another solution for sorting arbitrarily large partitions? If
> > not,
> > > I don't mind developing and contributing a solution.
> > >
> > >
> > >
> > >
> > > -----
> > > --
> > > Madhu
> > > https://www.linkedin.com/in/msiddalingaiah
> > > --
> > > View this message in context:
> > >
> >
> http://apache-spark-developers-list.1001551.n3.nabble.com/Sorting-partitions-in-Java-tp6715p6719.html
> > > Sent from the Apache Spark Developers List mailing list archive at
> > > Nabble.com.
> > >
> >
>

Re: Sorting partitions in Java

Posted by Andrew Ash <an...@andrewash.com>.
Sandy, is there a Jira ticket for that?


On Tue, May 20, 2014 at 10:12 AM, Sandy Ryza <sa...@cloudera.com>wrote:

> sortByKey currently requires partitions to fit in memory, but there are
> plans to add external sort
>
>
> On Tue, May 20, 2014 at 10:10 AM, Madhu <ma...@madhu.com> wrote:
>
> > Thanks Sean, I had seen that post you mentioned.
> >
> > What you suggest looks an in-memory sort, which is fine if each partition
> > is
> > small enough to fit in memory. Is it true that rdd.sortByKey(...)
> requires
> > partitions to fit in memory? I wasn't sure if there was some magic behind
> > the scenes that supports arbitrarily large sorts.
> >
> > None of this is a show stopper, it just might require a little more code
> on
> > the part of the developer. If there's a requirement for Spark partitions
> to
> > fit in memory, developers will have to be aware of that and plan
> > accordingly. One nice feature of Hadoop MR is the ability to sort very
> > large
> > sets without thinking about data size.
> >
> > In the case that a developer repartitions an RDD such that some
> partitions
> > don't fit in memory, sorting those partitions requires more work. For
> these
> > cases, I think there is value in having a robust partition sorting method
> > that deals with it efficiently and reliably.
> >
> > Is there another solution for sorting arbitrarily large partitions? If
> not,
> > I don't mind developing and contributing a solution.
> >
> >
> >
> >
> > -----
> > --
> > Madhu
> > https://www.linkedin.com/in/msiddalingaiah
> > --
> > View this message in context:
> >
> http://apache-spark-developers-list.1001551.n3.nabble.com/Sorting-partitions-in-Java-tp6715p6719.html
> > Sent from the Apache Spark Developers List mailing list archive at
> > Nabble.com.
> >
>

Re: Sorting partitions in Java

Posted by Sandy Ryza <sa...@cloudera.com>.
sortByKey currently requires partitions to fit in memory, but there are
plans to add external sort


On Tue, May 20, 2014 at 10:10 AM, Madhu <ma...@madhu.com> wrote:

> Thanks Sean, I had seen that post you mentioned.
>
> What you suggest looks an in-memory sort, which is fine if each partition
> is
> small enough to fit in memory. Is it true that rdd.sortByKey(...) requires
> partitions to fit in memory? I wasn't sure if there was some magic behind
> the scenes that supports arbitrarily large sorts.
>
> None of this is a show stopper, it just might require a little more code on
> the part of the developer. If there's a requirement for Spark partitions to
> fit in memory, developers will have to be aware of that and plan
> accordingly. One nice feature of Hadoop MR is the ability to sort very
> large
> sets without thinking about data size.
>
> In the case that a developer repartitions an RDD such that some partitions
> don't fit in memory, sorting those partitions requires more work. For these
> cases, I think there is value in having a robust partition sorting method
> that deals with it efficiently and reliably.
>
> Is there another solution for sorting arbitrarily large partitions? If not,
> I don't mind developing and contributing a solution.
>
>
>
>
> -----
> --
> Madhu
> https://www.linkedin.com/in/msiddalingaiah
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Sorting-partitions-in-Java-tp6715p6719.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>

Re: Sorting partitions in Java

Posted by Madhu <ma...@madhu.com>.
Sean,

No, I don't want to sort the whole RDD, sortByKey seems to be good enough
for that.

Right now, I think the code I have will work for me, but I can imagine
conditions where it will run out of memory.

I'm not completely sure if  SPARK-983
<https://issues.apache.org/jira/browse/SPARK-983>    Andrew mentioned covers
the rdd.sortPartitions() use case. Can someone comment on the scope of
SPARK-983?

Thanks!



-----
--
Madhu
https://www.linkedin.com/in/msiddalingaiah
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Sorting-partitions-in-Java-tp6715p6725.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re: Sorting partitions in Java

Posted by Sean Owen <so...@cloudera.com>.
On Tue, May 20, 2014 at 6:10 PM, Madhu <ma...@madhu.com> wrote:
> What you suggest looks an in-memory sort, which is fine if each partition is
> small enough to fit in memory. Is it true that rdd.sortByKey(...) requires
> partitions to fit in memory? I wasn't sure if there was some magic behind
> the scenes that supports arbitrarily large sorts.

Yes, but so did the Scala version you posted -- I assumed that was OK
for your use case. Regardless of what Spark does, you would copy all
values into memory with toArray.

sortByKey is something fairly different. It sorts the whole RDD by
key, not values within each key. I think Sandy is talking about
something related but not quite the same.

Do you really mean you want to sort the whole RDD?

Re: Sorting partitions in Java

Posted by Madhu <ma...@madhu.com>.
Thanks Sean, I had seen that post you mentioned.

What you suggest looks an in-memory sort, which is fine if each partition is
small enough to fit in memory. Is it true that rdd.sortByKey(...) requires
partitions to fit in memory? I wasn't sure if there was some magic behind
the scenes that supports arbitrarily large sorts.

None of this is a show stopper, it just might require a little more code on
the part of the developer. If there's a requirement for Spark partitions to
fit in memory, developers will have to be aware of that and plan
accordingly. One nice feature of Hadoop MR is the ability to sort very large
sets without thinking about data size.

In the case that a developer repartitions an RDD such that some partitions
don't fit in memory, sorting those partitions requires more work. For these
cases, I think there is value in having a robust partition sorting method
that deals with it efficiently and reliably.

Is there another solution for sorting arbitrarily large partitions? If not,
I don't mind developing and contributing a solution.




-----
--
Madhu
https://www.linkedin.com/in/msiddalingaiah
--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Sorting-partitions-in-Java-tp6715p6719.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re: Sorting partitions in Java

Posted by Sean Owen <so...@cloudera.com>.
It's an Iterator in both Java and Scala. In both cases you need to
copy the stream of values into something List-like to sort it. An
Iterable would not change that (not sure the API can promise many
iterations anyway).

If you just want the equivalent of "toArray", you can use a utility
method in Commons Collections or Guava. Guava's
Lists.newArrayList(Iterator) does nicely, which you can then
Collections.sort() with a Comparator and the return its iterator()

I dug this up too, remembering a similar question:
http://mail-archives.apache.org/mod_mbox/spark-user/201312.mbox/%3C529F819F.3060901@vu.nl%3E

On Tue, May 20, 2014 at 2:25 PM, Madhu <ma...@madhu.com> wrote:
> I'm trying to sort data in each partition of an RDD.
> I was able to do it successfully in Scala like this:
>
> val sorted = rdd.mapPartitions(iter => {
>   iter.toArray.sortWith((x, y) => x._2.compare(y._2) < 0).iterator
> },
> preservesPartitioning = true)
>
> I used the same technique as in OrderedRDDFunctions.scala, so I assume it's
> a reasonable way to do it.
>
> This works well so far, but I can't seem to do the same thing in Java
> because 'iter' in the Java APIs is an Iterator rather than an Iterable.
> There may be an unattractive workaround, but I didn't pursue it.
>
> Ideally, it would be nice to have an efficient, robust method in RDD to sort
> each partition.
> Does something like that exist?
>
> Thanks!
>
>
>
> -----
> --
> Madhu
> https://www.linkedin.com/in/msiddalingaiah
> --
> View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Sorting-partitions-in-Java-tp6715.html
> Sent from the Apache Spark Developers List mailing list archive at Nabble.com.