You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Arun Kumar <ar...@gmail.com> on 2013/10/25 11:01:38 UTC

almost sorted data

Hi,

I am trying to process some logs and the data is sorted(*almost*) by
timestamp.
If I do a full sort it takes a lot of time. Is there some way to sort more
efficiently (like restricting sort to per partition).

Thanks in advance

Re: almost sorted data

Posted by Christopher Nguyen <ct...@adatao.com>.
Nathan: why iterator semantics could be more appropriate than a
materialized list: the partition data could be sitting on disk, which could
be streamed into RAM upon access, and which could be left untouched if the
algo decided by some condidtional logic that it didn't need it. And you
could always get to a list from an iterator.

Best,
--
Christopher T. Nguyen
Co-founder & CEO, Adatao <http://adatao.com>
linkedin.com/in/ctnguyen



On Mon, Oct 28, 2013 at 7:44 AM, Nathan Kronenfeld <
nkronenfeld@oculusinfo.com> wrote:

> I'm not sure what you're asking.
>
> At some level, all RDDs only do partition-wise operations - they all only
> operate on one partition at a time.
>
> I suspect what you're looking for is something where you could just write:
>
> data.mapPartitions(_.sortBy(...))
>
> If that's what you want, then no - but only because Iterator has no sortBy
> method.  I'm not sure why mapPartitions hands one an iterator rather than a
> list.  Presumably so one can avoid having to have the whole partition in
> memory at once - but equally presumably, one already has the whole
> partition in memory at once, so that seems odd to me.  Anyone know why?
> Perhaps to allow for worst-case scenarios?
>
>              -Nathan
>
>
>
> On Mon, Oct 28, 2013 at 4:54 AM, Arun Kumar <ar...@gmail.com> wrote:
>
>> I will try using per partition sorted data. Can I also use groupBy and
>> join per partition? Basically I want to restrict the computation per
>> partition like using this data.mapPartitions(_.toList.sortBy(...).toIterator).
>> Is there a more direct way to create a RDD that does partition wise
>> operations?
>>
>>
>> On Sat, Oct 26, 2013 at 3:50 AM, Aaron Davidson <il...@gmail.com>wrote:
>>
>>> Currently, our sortByKey should be using Java's native Timsort
>>> implementation, which is an adaptive sort. That should also mean sorting is
>>> very fast for almost-sorted data. The overhead you're seeing might be
>>> caused by reshuffling everything during the range partitioning step *before
>>> *the sort, which has to serialize all your data.
>>>
>>> Nathan's solution might then work out nicely for you, as it will avoid
>>> shuffling the data.
>>>
>>>
>>> On Fri, Oct 25, 2013 at 9:18 AM, Josh Rosen <ro...@gmail.com>wrote:
>>>
>>>> Adaptive sorting algorithms (
>>>> https://en.wikipedia.org/wiki/Adaptive_sort) can benefit from
>>>> presortedness in their inputs, so that might be a helpful search term
>>>> for researching this problem.
>>>>
>>>>
>>>> On Fri, Oct 25, 2013 at 7:23 AM, Nathan Kronenfeld <
>>>> nkronenfeld@oculusinfo.com> wrote:
>>>>
>>>>> I suspect from his description the difference is negligible for his
>>>>> case.  However, there are ways around that anyway.
>>>>>
>>>>> Assuming a fixed data set (as opposed to something like a streaming
>>>>> example, where there is no last element), one can take 3 passes to:
>>>>>
>>>>>    1. get the last element of each partition
>>>>>    2. take elements from each partition that fall before the last
>>>>>    element of the previous partition, separate them from the rest of their
>>>>>    partition
>>>>>    3. and add them to the previous (whichever previous is
>>>>>    appropriate, in really degenerate cases, which it sounds like he doesn't
>>>>>    have) in the right location
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Oct 25, 2013 at 10:17 AM, Sebastian Schelter <ss...@apache.org>wrote:
>>>>>
>>>>>> Using a local sort per partition only gives a correct result if the
>>>>>> data
>>>>>> is already range partitioned.
>>>>>>
>>>>>> On 25.10.2013 16:11, Nathan Kronenfeld wrote:
>>>>>> > Since no one else has answered...
>>>>>> > I assume:
>>>>>> >
>>>>>> >     data.mapPartitions(_.toList.sortBy(...).toIterator)
>>>>>> >
>>>>>> > would work, but I also suspect there's a better way.
>>>>>> >
>>>>>> >
>>>>>> > On Fri, Oct 25, 2013 at 5:01 AM, Arun Kumar <ar...@gmail.com>
>>>>>> wrote:
>>>>>> >
>>>>>> >> Hi,
>>>>>> >>
>>>>>> >> I am trying to process some logs and the data is sorted(*almost*)
>>>>>> by
>>>>>> >> timestamp.
>>>>>> >> If I do a full sort it takes a lot of time. Is there some way to
>>>>>> sort more
>>>>>> >> efficiently (like restricting sort to per partition).
>>>>>> >>
>>>>>> >> Thanks in advance
>>>>>> >>
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Nathan Kronenfeld
>>>>> Senior Visualization Developer
>>>>> Oculus Info Inc
>>>>> 2 Berkeley Street, Suite 600,
>>>>> Toronto, Ontario M5A 4J5
>>>>> Phone:  +1-416-203-3003 x 238
>>>>> Email:  nkronenfeld@oculusinfo.com
>>>>>
>>>>
>>>>
>>>
>>
>
>
> --
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenfeld@oculusinfo.com
>

Re: almost sorted data

Posted by Nathan Kronenfeld <nk...@oculusinfo.com>.
I'm not sure what you're asking.

At some level, all RDDs only do partition-wise operations - they all only
operate on one partition at a time.

I suspect what you're looking for is something where you could just write:

data.mapPartitions(_.sortBy(...))

If that's what you want, then no - but only because Iterator has no sortBy
method.  I'm not sure why mapPartitions hands one an iterator rather than a
list.  Presumably so one can avoid having to have the whole partition in
memory at once - but equally presumably, one already has the whole
partition in memory at once, so that seems odd to me.  Anyone know why?
Perhaps to allow for worst-case scenarios?

             -Nathan



On Mon, Oct 28, 2013 at 4:54 AM, Arun Kumar <ar...@gmail.com> wrote:

> I will try using per partition sorted data. Can I also use groupBy and
> join per partition? Basically I want to restrict the computation per
> partition like using this data.mapPartitions(_.toList.sortBy(...).toIterator).
> Is there a more direct way to create a RDD that does partition wise
> operations?
>
>
> On Sat, Oct 26, 2013 at 3:50 AM, Aaron Davidson <il...@gmail.com>wrote:
>
>> Currently, our sortByKey should be using Java's native Timsort
>> implementation, which is an adaptive sort. That should also mean sorting is
>> very fast for almost-sorted data. The overhead you're seeing might be
>> caused by reshuffling everything during the range partitioning step *before
>> *the sort, which has to serialize all your data.
>>
>> Nathan's solution might then work out nicely for you, as it will avoid
>> shuffling the data.
>>
>>
>> On Fri, Oct 25, 2013 at 9:18 AM, Josh Rosen <ro...@gmail.com> wrote:
>>
>>> Adaptive sorting algorithms (https://en.wikipedia.org/wiki/Adaptive_sort)
>>> can benefit from presortedness in their inputs, so that might be a
>>> helpful search term for researching this problem.
>>>
>>>
>>> On Fri, Oct 25, 2013 at 7:23 AM, Nathan Kronenfeld <
>>> nkronenfeld@oculusinfo.com> wrote:
>>>
>>>> I suspect from his description the difference is negligible for his
>>>> case.  However, there are ways around that anyway.
>>>>
>>>> Assuming a fixed data set (as opposed to something like a streaming
>>>> example, where there is no last element), one can take 3 passes to:
>>>>
>>>>    1. get the last element of each partition
>>>>    2. take elements from each partition that fall before the last
>>>>    element of the previous partition, separate them from the rest of their
>>>>    partition
>>>>    3. and add them to the previous (whichever previous is appropriate,
>>>>    in really degenerate cases, which it sounds like he doesn't have) in the
>>>>    right location
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Oct 25, 2013 at 10:17 AM, Sebastian Schelter <ss...@apache.org>wrote:
>>>>
>>>>> Using a local sort per partition only gives a correct result if the
>>>>> data
>>>>> is already range partitioned.
>>>>>
>>>>> On 25.10.2013 16:11, Nathan Kronenfeld wrote:
>>>>> > Since no one else has answered...
>>>>> > I assume:
>>>>> >
>>>>> >     data.mapPartitions(_.toList.sortBy(...).toIterator)
>>>>> >
>>>>> > would work, but I also suspect there's a better way.
>>>>> >
>>>>> >
>>>>> > On Fri, Oct 25, 2013 at 5:01 AM, Arun Kumar <ar...@gmail.com>
>>>>> wrote:
>>>>> >
>>>>> >> Hi,
>>>>> >>
>>>>> >> I am trying to process some logs and the data is sorted(*almost*) by
>>>>> >> timestamp.
>>>>> >> If I do a full sort it takes a lot of time. Is there some way to
>>>>> sort more
>>>>> >> efficiently (like restricting sort to per partition).
>>>>> >>
>>>>> >> Thanks in advance
>>>>> >>
>>>>> >
>>>>> >
>>>>> >
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Nathan Kronenfeld
>>>> Senior Visualization Developer
>>>> Oculus Info Inc
>>>> 2 Berkeley Street, Suite 600,
>>>> Toronto, Ontario M5A 4J5
>>>> Phone:  +1-416-203-3003 x 238
>>>> Email:  nkronenfeld@oculusinfo.com
>>>>
>>>
>>>
>>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenfeld@oculusinfo.com

Re: almost sorted data

Posted by Arun Kumar <ar...@gmail.com>.
I will try using per partition sorted data. Can I also use groupBy and join
per partition? Basically I want to restrict the computation per partition
like using this data.mapPartitions(_.toList.sortBy(...).toIterator). Is
there a more direct way to create a RDD that does partition wise operations?


On Sat, Oct 26, 2013 at 3:50 AM, Aaron Davidson <il...@gmail.com> wrote:

> Currently, our sortByKey should be using Java's native Timsort
> implementation, which is an adaptive sort. That should also mean sorting is
> very fast for almost-sorted data. The overhead you're seeing might be
> caused by reshuffling everything during the range partitioning step *before
> *the sort, which has to serialize all your data.
>
> Nathan's solution might then work out nicely for you, as it will avoid
> shuffling the data.
>
>
> On Fri, Oct 25, 2013 at 9:18 AM, Josh Rosen <ro...@gmail.com> wrote:
>
>> Adaptive sorting algorithms (https://en.wikipedia.org/wiki/Adaptive_sort)
>> can benefit from presortedness in their inputs, so that might be a
>> helpful search term for researching this problem.
>>
>>
>> On Fri, Oct 25, 2013 at 7:23 AM, Nathan Kronenfeld <
>> nkronenfeld@oculusinfo.com> wrote:
>>
>>> I suspect from his description the difference is negligible for his
>>> case.  However, there are ways around that anyway.
>>>
>>> Assuming a fixed data set (as opposed to something like a streaming
>>> example, where there is no last element), one can take 3 passes to:
>>>
>>>    1. get the last element of each partition
>>>    2. take elements from each partition that fall before the last
>>>    element of the previous partition, separate them from the rest of their
>>>    partition
>>>    3. and add them to the previous (whichever previous is appropriate,
>>>    in really degenerate cases, which it sounds like he doesn't have) in the
>>>    right location
>>>
>>>
>>>
>>>
>>> On Fri, Oct 25, 2013 at 10:17 AM, Sebastian Schelter <ss...@apache.org>wrote:
>>>
>>>> Using a local sort per partition only gives a correct result if the data
>>>> is already range partitioned.
>>>>
>>>> On 25.10.2013 16:11, Nathan Kronenfeld wrote:
>>>> > Since no one else has answered...
>>>> > I assume:
>>>> >
>>>> >     data.mapPartitions(_.toList.sortBy(...).toIterator)
>>>> >
>>>> > would work, but I also suspect there's a better way.
>>>> >
>>>> >
>>>> > On Fri, Oct 25, 2013 at 5:01 AM, Arun Kumar <ar...@gmail.com>
>>>> wrote:
>>>> >
>>>> >> Hi,
>>>> >>
>>>> >> I am trying to process some logs and the data is sorted(*almost*) by
>>>> >> timestamp.
>>>> >> If I do a full sort it takes a lot of time. Is there some way to
>>>> sort more
>>>> >> efficiently (like restricting sort to per partition).
>>>> >>
>>>> >> Thanks in advance
>>>> >>
>>>> >
>>>> >
>>>> >
>>>>
>>>>
>>>
>>>
>>> --
>>> Nathan Kronenfeld
>>> Senior Visualization Developer
>>> Oculus Info Inc
>>> 2 Berkeley Street, Suite 600,
>>> Toronto, Ontario M5A 4J5
>>> Phone:  +1-416-203-3003 x 238
>>> Email:  nkronenfeld@oculusinfo.com
>>>
>>
>>
>

Re: almost sorted data

Posted by Aaron Davidson <il...@gmail.com>.
Currently, our sortByKey should be using Java's native Timsort
implementation, which is an adaptive sort. That should also mean sorting is
very fast for almost-sorted data. The overhead you're seeing might be
caused by reshuffling everything during the range partitioning step *before
*the sort, which has to serialize all your data.

Nathan's solution might then work out nicely for you, as it will avoid
shuffling the data.


On Fri, Oct 25, 2013 at 9:18 AM, Josh Rosen <ro...@gmail.com> wrote:

> Adaptive sorting algorithms (https://en.wikipedia.org/wiki/Adaptive_sort)
> can benefit from presortedness in their inputs, so that might be a
> helpful search term for researching this problem.
>
>
> On Fri, Oct 25, 2013 at 7:23 AM, Nathan Kronenfeld <
> nkronenfeld@oculusinfo.com> wrote:
>
>> I suspect from his description the difference is negligible for his case.
>>  However, there are ways around that anyway.
>>
>> Assuming a fixed data set (as opposed to something like a streaming
>> example, where there is no last element), one can take 3 passes to:
>>
>>    1. get the last element of each partition
>>    2. take elements from each partition that fall before the last
>>    element of the previous partition, separate them from the rest of their
>>    partition
>>    3. and add them to the previous (whichever previous is appropriate,
>>    in really degenerate cases, which it sounds like he doesn't have) in the
>>    right location
>>
>>
>>
>>
>> On Fri, Oct 25, 2013 at 10:17 AM, Sebastian Schelter <ss...@apache.org>wrote:
>>
>>> Using a local sort per partition only gives a correct result if the data
>>> is already range partitioned.
>>>
>>> On 25.10.2013 16:11, Nathan Kronenfeld wrote:
>>> > Since no one else has answered...
>>> > I assume:
>>> >
>>> >     data.mapPartitions(_.toList.sortBy(...).toIterator)
>>> >
>>> > would work, but I also suspect there's a better way.
>>> >
>>> >
>>> > On Fri, Oct 25, 2013 at 5:01 AM, Arun Kumar <ar...@gmail.com>
>>> wrote:
>>> >
>>> >> Hi,
>>> >>
>>> >> I am trying to process some logs and the data is sorted(*almost*) by
>>> >> timestamp.
>>> >> If I do a full sort it takes a lot of time. Is there some way to sort
>>> more
>>> >> efficiently (like restricting sort to per partition).
>>> >>
>>> >> Thanks in advance
>>> >>
>>> >
>>> >
>>> >
>>>
>>>
>>
>>
>> --
>> Nathan Kronenfeld
>> Senior Visualization Developer
>> Oculus Info Inc
>> 2 Berkeley Street, Suite 600,
>> Toronto, Ontario M5A 4J5
>> Phone:  +1-416-203-3003 x 238
>> Email:  nkronenfeld@oculusinfo.com
>>
>
>

Re: almost sorted data

Posted by Josh Rosen <ro...@gmail.com>.
Adaptive sorting algorithms (https://en.wikipedia.org/wiki/Adaptive_sort)
can benefit from presortedness in their inputs, so that might be a helpful
search term for researching this problem.


On Fri, Oct 25, 2013 at 7:23 AM, Nathan Kronenfeld <
nkronenfeld@oculusinfo.com> wrote:

> I suspect from his description the difference is negligible for his case.
>  However, there are ways around that anyway.
>
> Assuming a fixed data set (as opposed to something like a streaming
> example, where there is no last element), one can take 3 passes to:
>
>    1. get the last element of each partition
>    2. take elements from each partition that fall before the last element
>    of the previous partition, separate them from the rest of their partition
>    3. and add them to the previous (whichever previous is appropriate, in
>    really degenerate cases, which it sounds like he doesn't have) in the right
>    location
>
>
>
>
> On Fri, Oct 25, 2013 at 10:17 AM, Sebastian Schelter <ss...@apache.org>wrote:
>
>> Using a local sort per partition only gives a correct result if the data
>> is already range partitioned.
>>
>> On 25.10.2013 16:11, Nathan Kronenfeld wrote:
>> > Since no one else has answered...
>> > I assume:
>> >
>> >     data.mapPartitions(_.toList.sortBy(...).toIterator)
>> >
>> > would work, but I also suspect there's a better way.
>> >
>> >
>> > On Fri, Oct 25, 2013 at 5:01 AM, Arun Kumar <ar...@gmail.com>
>> wrote:
>> >
>> >> Hi,
>> >>
>> >> I am trying to process some logs and the data is sorted(*almost*) by
>> >> timestamp.
>> >> If I do a full sort it takes a lot of time. Is there some way to sort
>> more
>> >> efficiently (like restricting sort to per partition).
>> >>
>> >> Thanks in advance
>> >>
>> >
>> >
>> >
>>
>>
>
>
> --
> Nathan Kronenfeld
> Senior Visualization Developer
> Oculus Info Inc
> 2 Berkeley Street, Suite 600,
> Toronto, Ontario M5A 4J5
> Phone:  +1-416-203-3003 x 238
> Email:  nkronenfeld@oculusinfo.com
>

Re: almost sorted data

Posted by Nathan Kronenfeld <nk...@oculusinfo.com>.
I suspect from his description the difference is negligible for his case.
 However, there are ways around that anyway.

Assuming a fixed data set (as opposed to something like a streaming
example, where there is no last element), one can take 3 passes to:

   1. get the last element of each partition
   2. take elements from each partition that fall before the last element
   of the previous partition, separate them from the rest of their partition
   3. and add them to the previous (whichever previous is appropriate, in
   really degenerate cases, which it sounds like he doesn't have) in the right
   location




On Fri, Oct 25, 2013 at 10:17 AM, Sebastian Schelter <ss...@apache.org> wrote:

> Using a local sort per partition only gives a correct result if the data
> is already range partitioned.
>
> On 25.10.2013 16:11, Nathan Kronenfeld wrote:
> > Since no one else has answered...
> > I assume:
> >
> >     data.mapPartitions(_.toList.sortBy(...).toIterator)
> >
> > would work, but I also suspect there's a better way.
> >
> >
> > On Fri, Oct 25, 2013 at 5:01 AM, Arun Kumar <ar...@gmail.com>
> wrote:
> >
> >> Hi,
> >>
> >> I am trying to process some logs and the data is sorted(*almost*) by
> >> timestamp.
> >> If I do a full sort it takes a lot of time. Is there some way to sort
> more
> >> efficiently (like restricting sort to per partition).
> >>
> >> Thanks in advance
> >>
> >
> >
> >
>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenfeld@oculusinfo.com

Re: almost sorted data

Posted by Sebastian Schelter <ss...@apache.org>.
Using a local sort per partition only gives a correct result if the data
is already range partitioned.

On 25.10.2013 16:11, Nathan Kronenfeld wrote:
> Since no one else has answered...
> I assume:
> 
>     data.mapPartitions(_.toList.sortBy(...).toIterator)
> 
> would work, but I also suspect there's a better way.
> 
> 
> On Fri, Oct 25, 2013 at 5:01 AM, Arun Kumar <ar...@gmail.com> wrote:
> 
>> Hi,
>>
>> I am trying to process some logs and the data is sorted(*almost*) by
>> timestamp.
>> If I do a full sort it takes a lot of time. Is there some way to sort more
>> efficiently (like restricting sort to per partition).
>>
>> Thanks in advance
>>
> 
> 
> 


Re: almost sorted data

Posted by Nathan Kronenfeld <nk...@oculusinfo.com>.
Since no one else has answered...
I assume:

    data.mapPartitions(_.toList.sortBy(...).toIterator)

would work, but I also suspect there's a better way.


On Fri, Oct 25, 2013 at 5:01 AM, Arun Kumar <ar...@gmail.com> wrote:

> Hi,
>
> I am trying to process some logs and the data is sorted(*almost*) by
> timestamp.
> If I do a full sort it takes a lot of time. Is there some way to sort more
> efficiently (like restricting sort to per partition).
>
> Thanks in advance
>



-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenfeld@oculusinfo.com