You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by RJ Nowling <rn...@gmail.com> on 2015/06/30 20:01:57 UTC

Grouping runs of elements in a RDD

Hi all,

I have a problem where I have a RDD of elements:

Item1 Item2 Item3 Item4 Item5 Item6 ...

and I want to run a function over them to decide which runs of elements to
group together:

[Item1 Item2] [Item3] [Item4 Item5 Item6] ...

Technically, I could use aggregate to do this, but I would have to use a
List of List of T which would produce a very large collection in memory.

Is there an easy way to accomplish this?  e.g.,, it would be nice to have a
version of aggregate where the combination function can return a complete
group that is added to the new RDD and an incomplete group which is passed
to the next call of the reduce function.

Thanks,
RJ

Re: Grouping runs of elements in a RDD

Posted by RJ Nowling <rn...@gmail.com>.
Thanks, Mohit.  It sounds like we're on the same page -- I used a similar
approach.

On Thu, Jul 2, 2015 at 12:27 PM, Mohit Jaggi <mo...@gmail.com> wrote:

> if you are joining successive lines together based on a predicate, then
> you are doing a "flatMap" not an "aggregate". you are on the right track
> with a multi-pass solution. i had the same challenge when i needed a
> sliding window over an RDD(see below).
>
> [ i had suggested that the sliding window API be moved to spark-core. not
> sure if that happened ]
>
> ----- previous posts ---
>
>
> http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions
>
> > On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi <mo...@gmail.com>
> > wrote:
> >
> >
> > http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3CCALRVTpKN65rOLzbETC+Ddk4O+YJm+TfAF5DZ8EuCpL-2YHYPZA@mail.gmail.com%3E
> >
> > you can use the MLLib function or do the following (which is what I had
> > done):
> >
> > - in first pass over the data, using mapPartitionWithIndex, gather the
> > first item in each partition. you can use collect (or aggregator) for this.
> > “key” them by the partition index. at the end, you will have a map
> >    (partition index) --> first item
> > - in the second pass over the data, using mapPartitionWithIndex again,
> > look at two (or in the general case N items at a time, you can use scala’s
> > sliding iterator) items at a time and check the time difference(or any
> > sliding window computation). To this mapParitition, pass the map created in
> > previous step. You will need to use them to check the last item in this
> > partition.
> >
> > If you can tolerate a few inaccuracies then you can just do the second
> > step. You will miss the “boundaries” of the partitions but it might be
> > acceptable for your use case.
>
>
> On Tue, Jun 30, 2015 at 12:21 PM, RJ Nowling <rn...@gmail.com> wrote:
>
>> That's an interesting idea!  I hadn't considered that.  However, looking
>> at the Partitioner interface, I would need to know from looking at a single
>> key which doesn't fit my case, unfortunately.  For my case, I need to
>> compare successive pairs of keys.  (I'm trying to re-join lines that were
>> split prematurely.)
>>
>> On Tue, Jun 30, 2015 at 2:07 PM, Abhishek R. Singh <
>> abhishsi@tetrationanalytics.com> wrote:
>>
>>> could you use a custom partitioner to preserve boundaries such that all
>>> related tuples end up on the same partition?
>>>
>>> On Jun 30, 2015, at 12:00 PM, RJ Nowling <rn...@gmail.com> wrote:
>>>
>>> Thanks, Reynold.  I still need to handle incomplete groups that fall
>>> between partition boundaries. So, I need a two-pass approach. I came up
>>> with a somewhat hacky way to handle those using the partition indices and
>>> key-value pairs as a second pass after the first.
>>>
>>> OCaml's std library provides a function called group() that takes a
>>> break function that operators on pairs of successive elements.  It seems a
>>> similar approach could be used in Spark and would be more efficient than my
>>> approach with key-value pairs since you know the ordering of the partitions.
>>>
>>> Has this need been expressed by others?
>>>
>>> On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin <rx...@databricks.com>
>>> wrote:
>>>
>>>> Try mapPartitions, which gives you an iterator, and you can produce an
>>>> iterator back.
>>>>
>>>>
>>>> On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling <rn...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I have a problem where I have a RDD of elements:
>>>>>
>>>>> Item1 Item2 Item3 Item4 Item5 Item6 ...
>>>>>
>>>>> and I want to run a function over them to decide which runs of
>>>>> elements to group together:
>>>>>
>>>>> [Item1 Item2] [Item3] [Item4 Item5 Item6] ...
>>>>>
>>>>> Technically, I could use aggregate to do this, but I would have to use
>>>>> a List of List of T which would produce a very large collection in memory.
>>>>>
>>>>> Is there an easy way to accomplish this?  e.g.,, it would be nice to
>>>>> have a version of aggregate where the combination function can return a
>>>>> complete group that is added to the new RDD and an incomplete group which
>>>>> is passed to the next call of the reduce function.
>>>>>
>>>>> Thanks,
>>>>> RJ
>>>>>
>>>>
>>>>
>>>
>>>
>>
>

Re: Grouping runs of elements in a RDD

Posted by RJ Nowling <rn...@gmail.com>.
Thanks, Mohit.  It sounds like we're on the same page -- I used a similar
approach.

On Thu, Jul 2, 2015 at 12:27 PM, Mohit Jaggi <mo...@gmail.com> wrote:

> if you are joining successive lines together based on a predicate, then
> you are doing a "flatMap" not an "aggregate". you are on the right track
> with a multi-pass solution. i had the same challenge when i needed a
> sliding window over an RDD(see below).
>
> [ i had suggested that the sliding window API be moved to spark-core. not
> sure if that happened ]
>
> ----- previous posts ---
>
>
> http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions
>
> > On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi <mo...@gmail.com>
> > wrote:
> >
> >
> > http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3CCALRVTpKN65rOLzbETC+Ddk4O+YJm+TfAF5DZ8EuCpL-2YHYPZA@mail.gmail.com%3E
> >
> > you can use the MLLib function or do the following (which is what I had
> > done):
> >
> > - in first pass over the data, using mapPartitionWithIndex, gather the
> > first item in each partition. you can use collect (or aggregator) for this.
> > “key” them by the partition index. at the end, you will have a map
> >    (partition index) --> first item
> > - in the second pass over the data, using mapPartitionWithIndex again,
> > look at two (or in the general case N items at a time, you can use scala’s
> > sliding iterator) items at a time and check the time difference(or any
> > sliding window computation). To this mapParitition, pass the map created in
> > previous step. You will need to use them to check the last item in this
> > partition.
> >
> > If you can tolerate a few inaccuracies then you can just do the second
> > step. You will miss the “boundaries” of the partitions but it might be
> > acceptable for your use case.
>
>
> On Tue, Jun 30, 2015 at 12:21 PM, RJ Nowling <rn...@gmail.com> wrote:
>
>> That's an interesting idea!  I hadn't considered that.  However, looking
>> at the Partitioner interface, I would need to know from looking at a single
>> key which doesn't fit my case, unfortunately.  For my case, I need to
>> compare successive pairs of keys.  (I'm trying to re-join lines that were
>> split prematurely.)
>>
>> On Tue, Jun 30, 2015 at 2:07 PM, Abhishek R. Singh <
>> abhishsi@tetrationanalytics.com> wrote:
>>
>>> could you use a custom partitioner to preserve boundaries such that all
>>> related tuples end up on the same partition?
>>>
>>> On Jun 30, 2015, at 12:00 PM, RJ Nowling <rn...@gmail.com> wrote:
>>>
>>> Thanks, Reynold.  I still need to handle incomplete groups that fall
>>> between partition boundaries. So, I need a two-pass approach. I came up
>>> with a somewhat hacky way to handle those using the partition indices and
>>> key-value pairs as a second pass after the first.
>>>
>>> OCaml's std library provides a function called group() that takes a
>>> break function that operators on pairs of successive elements.  It seems a
>>> similar approach could be used in Spark and would be more efficient than my
>>> approach with key-value pairs since you know the ordering of the partitions.
>>>
>>> Has this need been expressed by others?
>>>
>>> On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin <rx...@databricks.com>
>>> wrote:
>>>
>>>> Try mapPartitions, which gives you an iterator, and you can produce an
>>>> iterator back.
>>>>
>>>>
>>>> On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling <rn...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> I have a problem where I have a RDD of elements:
>>>>>
>>>>> Item1 Item2 Item3 Item4 Item5 Item6 ...
>>>>>
>>>>> and I want to run a function over them to decide which runs of
>>>>> elements to group together:
>>>>>
>>>>> [Item1 Item2] [Item3] [Item4 Item5 Item6] ...
>>>>>
>>>>> Technically, I could use aggregate to do this, but I would have to use
>>>>> a List of List of T which would produce a very large collection in memory.
>>>>>
>>>>> Is there an easy way to accomplish this?  e.g.,, it would be nice to
>>>>> have a version of aggregate where the combination function can return a
>>>>> complete group that is added to the new RDD and an incomplete group which
>>>>> is passed to the next call of the reduce function.
>>>>>
>>>>> Thanks,
>>>>> RJ
>>>>>
>>>>
>>>>
>>>
>>>
>>
>

Re: Grouping runs of elements in a RDD

Posted by Mohit Jaggi <mo...@gmail.com>.
if you are joining successive lines together based on a predicate, then you
are doing a "flatMap" not an "aggregate". you are on the right track with a
multi-pass solution. i had the same challenge when i needed a sliding
window over an RDD(see below).

[ i had suggested that the sliding window API be moved to spark-core. not
sure if that happened ]

----- previous posts ---

http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions

> On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi <mo...@gmail.com>
> wrote:
>
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3CCALRVTpKN65rOLzbETC+Ddk4O+YJm+TfAF5DZ8EuCpL-2YHYPZA@mail.gmail.com%3E
>
> you can use the MLLib function or do the following (which is what I had
> done):
>
> - in first pass over the data, using mapPartitionWithIndex, gather the
> first item in each partition. you can use collect (or aggregator) for this.
> “key” them by the partition index. at the end, you will have a map
>    (partition index) --> first item
> - in the second pass over the data, using mapPartitionWithIndex again,
> look at two (or in the general case N items at a time, you can use scala’s
> sliding iterator) items at a time and check the time difference(or any
> sliding window computation). To this mapParitition, pass the map created in
> previous step. You will need to use them to check the last item in this
> partition.
>
> If you can tolerate a few inaccuracies then you can just do the second
> step. You will miss the “boundaries” of the partitions but it might be
> acceptable for your use case.


On Tue, Jun 30, 2015 at 12:21 PM, RJ Nowling <rn...@gmail.com> wrote:

> That's an interesting idea!  I hadn't considered that.  However, looking
> at the Partitioner interface, I would need to know from looking at a single
> key which doesn't fit my case, unfortunately.  For my case, I need to
> compare successive pairs of keys.  (I'm trying to re-join lines that were
> split prematurely.)
>
> On Tue, Jun 30, 2015 at 2:07 PM, Abhishek R. Singh <
> abhishsi@tetrationanalytics.com> wrote:
>
>> could you use a custom partitioner to preserve boundaries such that all
>> related tuples end up on the same partition?
>>
>> On Jun 30, 2015, at 12:00 PM, RJ Nowling <rn...@gmail.com> wrote:
>>
>> Thanks, Reynold.  I still need to handle incomplete groups that fall
>> between partition boundaries. So, I need a two-pass approach. I came up
>> with a somewhat hacky way to handle those using the partition indices and
>> key-value pairs as a second pass after the first.
>>
>> OCaml's std library provides a function called group() that takes a break
>> function that operators on pairs of successive elements.  It seems a
>> similar approach could be used in Spark and would be more efficient than my
>> approach with key-value pairs since you know the ordering of the partitions.
>>
>> Has this need been expressed by others?
>>
>> On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin <rx...@databricks.com> wrote:
>>
>>> Try mapPartitions, which gives you an iterator, and you can produce an
>>> iterator back.
>>>
>>>
>>> On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling <rn...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have a problem where I have a RDD of elements:
>>>>
>>>> Item1 Item2 Item3 Item4 Item5 Item6 ...
>>>>
>>>> and I want to run a function over them to decide which runs of elements
>>>> to group together:
>>>>
>>>> [Item1 Item2] [Item3] [Item4 Item5 Item6] ...
>>>>
>>>> Technically, I could use aggregate to do this, but I would have to use
>>>> a List of List of T which would produce a very large collection in memory.
>>>>
>>>> Is there an easy way to accomplish this?  e.g.,, it would be nice to
>>>> have a version of aggregate where the combination function can return a
>>>> complete group that is added to the new RDD and an incomplete group which
>>>> is passed to the next call of the reduce function.
>>>>
>>>> Thanks,
>>>> RJ
>>>>
>>>
>>>
>>
>>
>

Re: Grouping runs of elements in a RDD

Posted by Mohit Jaggi <mo...@gmail.com>.
if you are joining successive lines together based on a predicate, then you
are doing a "flatMap" not an "aggregate". you are on the right track with a
multi-pass solution. i had the same challenge when i needed a sliding
window over an RDD(see below).

[ i had suggested that the sliding window API be moved to spark-core. not
sure if that happened ]

----- previous posts ---

http://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions

> On Fri, Jan 30, 2015 at 12:27 AM, Mohit Jaggi <mo...@gmail.com>
> wrote:
>
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201405.mbox/%3CCALRVTpKN65rOLzbETC+Ddk4O+YJm+TfAF5DZ8EuCpL-2YHYPZA@mail.gmail.com%3E
>
> you can use the MLLib function or do the following (which is what I had
> done):
>
> - in first pass over the data, using mapPartitionWithIndex, gather the
> first item in each partition. you can use collect (or aggregator) for this.
> “key” them by the partition index. at the end, you will have a map
>    (partition index) --> first item
> - in the second pass over the data, using mapPartitionWithIndex again,
> look at two (or in the general case N items at a time, you can use scala’s
> sliding iterator) items at a time and check the time difference(or any
> sliding window computation). To this mapParitition, pass the map created in
> previous step. You will need to use them to check the last item in this
> partition.
>
> If you can tolerate a few inaccuracies then you can just do the second
> step. You will miss the “boundaries” of the partitions but it might be
> acceptable for your use case.


On Tue, Jun 30, 2015 at 12:21 PM, RJ Nowling <rn...@gmail.com> wrote:

> That's an interesting idea!  I hadn't considered that.  However, looking
> at the Partitioner interface, I would need to know from looking at a single
> key which doesn't fit my case, unfortunately.  For my case, I need to
> compare successive pairs of keys.  (I'm trying to re-join lines that were
> split prematurely.)
>
> On Tue, Jun 30, 2015 at 2:07 PM, Abhishek R. Singh <
> abhishsi@tetrationanalytics.com> wrote:
>
>> could you use a custom partitioner to preserve boundaries such that all
>> related tuples end up on the same partition?
>>
>> On Jun 30, 2015, at 12:00 PM, RJ Nowling <rn...@gmail.com> wrote:
>>
>> Thanks, Reynold.  I still need to handle incomplete groups that fall
>> between partition boundaries. So, I need a two-pass approach. I came up
>> with a somewhat hacky way to handle those using the partition indices and
>> key-value pairs as a second pass after the first.
>>
>> OCaml's std library provides a function called group() that takes a break
>> function that operators on pairs of successive elements.  It seems a
>> similar approach could be used in Spark and would be more efficient than my
>> approach with key-value pairs since you know the ordering of the partitions.
>>
>> Has this need been expressed by others?
>>
>> On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin <rx...@databricks.com> wrote:
>>
>>> Try mapPartitions, which gives you an iterator, and you can produce an
>>> iterator back.
>>>
>>>
>>> On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling <rn...@gmail.com> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I have a problem where I have a RDD of elements:
>>>>
>>>> Item1 Item2 Item3 Item4 Item5 Item6 ...
>>>>
>>>> and I want to run a function over them to decide which runs of elements
>>>> to group together:
>>>>
>>>> [Item1 Item2] [Item3] [Item4 Item5 Item6] ...
>>>>
>>>> Technically, I could use aggregate to do this, but I would have to use
>>>> a List of List of T which would produce a very large collection in memory.
>>>>
>>>> Is there an easy way to accomplish this?  e.g.,, it would be nice to
>>>> have a version of aggregate where the combination function can return a
>>>> complete group that is added to the new RDD and an incomplete group which
>>>> is passed to the next call of the reduce function.
>>>>
>>>> Thanks,
>>>> RJ
>>>>
>>>
>>>
>>
>>
>

Re: Grouping runs of elements in a RDD

Posted by RJ Nowling <rn...@gmail.com>.
That's an interesting idea!  I hadn't considered that.  However, looking at
the Partitioner interface, I would need to know from looking at a single
key which doesn't fit my case, unfortunately.  For my case, I need to
compare successive pairs of keys.  (I'm trying to re-join lines that were
split prematurely.)

On Tue, Jun 30, 2015 at 2:07 PM, Abhishek R. Singh <
abhishsi@tetrationanalytics.com> wrote:

> could you use a custom partitioner to preserve boundaries such that all
> related tuples end up on the same partition?
>
> On Jun 30, 2015, at 12:00 PM, RJ Nowling <rn...@gmail.com> wrote:
>
> Thanks, Reynold.  I still need to handle incomplete groups that fall
> between partition boundaries. So, I need a two-pass approach. I came up
> with a somewhat hacky way to handle those using the partition indices and
> key-value pairs as a second pass after the first.
>
> OCaml's std library provides a function called group() that takes a break
> function that operators on pairs of successive elements.  It seems a
> similar approach could be used in Spark and would be more efficient than my
> approach with key-value pairs since you know the ordering of the partitions.
>
> Has this need been expressed by others?
>
> On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin <rx...@databricks.com> wrote:
>
>> Try mapPartitions, which gives you an iterator, and you can produce an
>> iterator back.
>>
>>
>> On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling <rn...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I have a problem where I have a RDD of elements:
>>>
>>> Item1 Item2 Item3 Item4 Item5 Item6 ...
>>>
>>> and I want to run a function over them to decide which runs of elements
>>> to group together:
>>>
>>> [Item1 Item2] [Item3] [Item4 Item5 Item6] ...
>>>
>>> Technically, I could use aggregate to do this, but I would have to use a
>>> List of List of T which would produce a very large collection in memory.
>>>
>>> Is there an easy way to accomplish this?  e.g.,, it would be nice to
>>> have a version of aggregate where the combination function can return a
>>> complete group that is added to the new RDD and an incomplete group which
>>> is passed to the next call of the reduce function.
>>>
>>> Thanks,
>>> RJ
>>>
>>
>>
>
>

Re: Grouping runs of elements in a RDD

Posted by RJ Nowling <rn...@gmail.com>.
That's an interesting idea!  I hadn't considered that.  However, looking at
the Partitioner interface, I would need to know from looking at a single
key which doesn't fit my case, unfortunately.  For my case, I need to
compare successive pairs of keys.  (I'm trying to re-join lines that were
split prematurely.)

On Tue, Jun 30, 2015 at 2:07 PM, Abhishek R. Singh <
abhishsi@tetrationanalytics.com> wrote:

> could you use a custom partitioner to preserve boundaries such that all
> related tuples end up on the same partition?
>
> On Jun 30, 2015, at 12:00 PM, RJ Nowling <rn...@gmail.com> wrote:
>
> Thanks, Reynold.  I still need to handle incomplete groups that fall
> between partition boundaries. So, I need a two-pass approach. I came up
> with a somewhat hacky way to handle those using the partition indices and
> key-value pairs as a second pass after the first.
>
> OCaml's std library provides a function called group() that takes a break
> function that operators on pairs of successive elements.  It seems a
> similar approach could be used in Spark and would be more efficient than my
> approach with key-value pairs since you know the ordering of the partitions.
>
> Has this need been expressed by others?
>
> On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin <rx...@databricks.com> wrote:
>
>> Try mapPartitions, which gives you an iterator, and you can produce an
>> iterator back.
>>
>>
>> On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling <rn...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> I have a problem where I have a RDD of elements:
>>>
>>> Item1 Item2 Item3 Item4 Item5 Item6 ...
>>>
>>> and I want to run a function over them to decide which runs of elements
>>> to group together:
>>>
>>> [Item1 Item2] [Item3] [Item4 Item5 Item6] ...
>>>
>>> Technically, I could use aggregate to do this, but I would have to use a
>>> List of List of T which would produce a very large collection in memory.
>>>
>>> Is there an easy way to accomplish this?  e.g.,, it would be nice to
>>> have a version of aggregate where the combination function can return a
>>> complete group that is added to the new RDD and an incomplete group which
>>> is passed to the next call of the reduce function.
>>>
>>> Thanks,
>>> RJ
>>>
>>
>>
>
>

Re: Grouping runs of elements in a RDD

Posted by "Abhishek R. Singh" <ab...@tetrationanalytics.com>.
could you use a custom partitioner to preserve boundaries such that all related tuples end up on the same partition?

On Jun 30, 2015, at 12:00 PM, RJ Nowling <rn...@gmail.com> wrote:

> Thanks, Reynold.  I still need to handle incomplete groups that fall between partition boundaries. So, I need a two-pass approach. I came up with a somewhat hacky way to handle those using the partition indices and key-value pairs as a second pass after the first.
> 
> OCaml's std library provides a function called group() that takes a break function that operators on pairs of successive elements.  It seems a similar approach could be used in Spark and would be more efficient than my approach with key-value pairs since you know the ordering of the partitions.
> 
> Has this need been expressed by others?  
> 
> On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin <rx...@databricks.com> wrote:
> Try mapPartitions, which gives you an iterator, and you can produce an iterator back.
> 
> 
> On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling <rn...@gmail.com> wrote:
> Hi all,
> 
> I have a problem where I have a RDD of elements:
> 
> Item1 Item2 Item3 Item4 Item5 Item6 ...
> 
> and I want to run a function over them to decide which runs of elements to group together:
> 
> [Item1 Item2] [Item3] [Item4 Item5 Item6] ...
> 
> Technically, I could use aggregate to do this, but I would have to use a List of List of T which would produce a very large collection in memory.
> 
> Is there an easy way to accomplish this?  e.g.,, it would be nice to have a version of aggregate where the combination function can return a complete group that is added to the new RDD and an incomplete group which is passed to the next call of the reduce function.
> 
> Thanks,
> RJ
> 
> 


Re: Grouping runs of elements in a RDD

Posted by "Abhishek R. Singh" <ab...@tetrationanalytics.com>.
could you use a custom partitioner to preserve boundaries such that all related tuples end up on the same partition?

On Jun 30, 2015, at 12:00 PM, RJ Nowling <rn...@gmail.com> wrote:

> Thanks, Reynold.  I still need to handle incomplete groups that fall between partition boundaries. So, I need a two-pass approach. I came up with a somewhat hacky way to handle those using the partition indices and key-value pairs as a second pass after the first.
> 
> OCaml's std library provides a function called group() that takes a break function that operators on pairs of successive elements.  It seems a similar approach could be used in Spark and would be more efficient than my approach with key-value pairs since you know the ordering of the partitions.
> 
> Has this need been expressed by others?  
> 
> On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin <rx...@databricks.com> wrote:
> Try mapPartitions, which gives you an iterator, and you can produce an iterator back.
> 
> 
> On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling <rn...@gmail.com> wrote:
> Hi all,
> 
> I have a problem where I have a RDD of elements:
> 
> Item1 Item2 Item3 Item4 Item5 Item6 ...
> 
> and I want to run a function over them to decide which runs of elements to group together:
> 
> [Item1 Item2] [Item3] [Item4 Item5 Item6] ...
> 
> Technically, I could use aggregate to do this, but I would have to use a List of List of T which would produce a very large collection in memory.
> 
> Is there an easy way to accomplish this?  e.g.,, it would be nice to have a version of aggregate where the combination function can return a complete group that is added to the new RDD and an incomplete group which is passed to the next call of the reduce function.
> 
> Thanks,
> RJ
> 
> 


Re: Grouping runs of elements in a RDD

Posted by RJ Nowling <rn...@gmail.com>.
Thanks, Reynold.  I still need to handle incomplete groups that fall
between partition boundaries. So, I need a two-pass approach. I came up
with a somewhat hacky way to handle those using the partition indices and
key-value pairs as a second pass after the first.

OCaml's std library provides a function called group() that takes a break
function that operators on pairs of successive elements.  It seems a
similar approach could be used in Spark and would be more efficient than my
approach with key-value pairs since you know the ordering of the partitions.

Has this need been expressed by others?

On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin <rx...@databricks.com> wrote:

> Try mapPartitions, which gives you an iterator, and you can produce an
> iterator back.
>
>
> On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling <rn...@gmail.com> wrote:
>
>> Hi all,
>>
>> I have a problem where I have a RDD of elements:
>>
>> Item1 Item2 Item3 Item4 Item5 Item6 ...
>>
>> and I want to run a function over them to decide which runs of elements
>> to group together:
>>
>> [Item1 Item2] [Item3] [Item4 Item5 Item6] ...
>>
>> Technically, I could use aggregate to do this, but I would have to use a
>> List of List of T which would produce a very large collection in memory.
>>
>> Is there an easy way to accomplish this?  e.g.,, it would be nice to have
>> a version of aggregate where the combination function can return a complete
>> group that is added to the new RDD and an incomplete group which is passed
>> to the next call of the reduce function.
>>
>> Thanks,
>> RJ
>>
>
>

Re: Grouping runs of elements in a RDD

Posted by RJ Nowling <rn...@gmail.com>.
Thanks, Reynold.  I still need to handle incomplete groups that fall
between partition boundaries. So, I need a two-pass approach. I came up
with a somewhat hacky way to handle those using the partition indices and
key-value pairs as a second pass after the first.

OCaml's std library provides a function called group() that takes a break
function that operators on pairs of successive elements.  It seems a
similar approach could be used in Spark and would be more efficient than my
approach with key-value pairs since you know the ordering of the partitions.

Has this need been expressed by others?

On Tue, Jun 30, 2015 at 1:03 PM, Reynold Xin <rx...@databricks.com> wrote:

> Try mapPartitions, which gives you an iterator, and you can produce an
> iterator back.
>
>
> On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling <rn...@gmail.com> wrote:
>
>> Hi all,
>>
>> I have a problem where I have a RDD of elements:
>>
>> Item1 Item2 Item3 Item4 Item5 Item6 ...
>>
>> and I want to run a function over them to decide which runs of elements
>> to group together:
>>
>> [Item1 Item2] [Item3] [Item4 Item5 Item6] ...
>>
>> Technically, I could use aggregate to do this, but I would have to use a
>> List of List of T which would produce a very large collection in memory.
>>
>> Is there an easy way to accomplish this?  e.g.,, it would be nice to have
>> a version of aggregate where the combination function can return a complete
>> group that is added to the new RDD and an incomplete group which is passed
>> to the next call of the reduce function.
>>
>> Thanks,
>> RJ
>>
>
>

Re: Grouping runs of elements in a RDD

Posted by Reynold Xin <rx...@databricks.com>.
Try mapPartitions, which gives you an iterator, and you can produce an
iterator back.


On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling <rn...@gmail.com> wrote:

> Hi all,
>
> I have a problem where I have a RDD of elements:
>
> Item1 Item2 Item3 Item4 Item5 Item6 ...
>
> and I want to run a function over them to decide which runs of elements to
> group together:
>
> [Item1 Item2] [Item3] [Item4 Item5 Item6] ...
>
> Technically, I could use aggregate to do this, but I would have to use a
> List of List of T which would produce a very large collection in memory.
>
> Is there an easy way to accomplish this?  e.g.,, it would be nice to have
> a version of aggregate where the combination function can return a complete
> group that is added to the new RDD and an incomplete group which is passed
> to the next call of the reduce function.
>
> Thanks,
> RJ
>

Re: Grouping runs of elements in a RDD

Posted by Reynold Xin <rx...@databricks.com>.
Try mapPartitions, which gives you an iterator, and you can produce an
iterator back.


On Tue, Jun 30, 2015 at 11:01 AM, RJ Nowling <rn...@gmail.com> wrote:

> Hi all,
>
> I have a problem where I have a RDD of elements:
>
> Item1 Item2 Item3 Item4 Item5 Item6 ...
>
> and I want to run a function over them to decide which runs of elements to
> group together:
>
> [Item1 Item2] [Item3] [Item4 Item5 Item6] ...
>
> Technically, I could use aggregate to do this, but I would have to use a
> List of List of T which would produce a very large collection in memory.
>
> Is there an easy way to accomplish this?  e.g.,, it would be nice to have
> a version of aggregate where the combination function can return a complete
> group that is added to the new RDD and an incomplete group which is passed
> to the next call of the reduce function.
>
> Thanks,
> RJ
>