You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "assaf.mendelson" <as...@rsa.com> on 2016/09/27 14:02:13 UTC

https://issues.apache.org/jira/browse/SPARK-17691

Hi,

I wanted to try to implement https://issues.apache.org/jira/browse/SPARK-17691.
So I started by looking at the implementation of collect_list. My idea was, do the same as they but when adding a new element, if there are already more than the threshold, remove one instead.
The problem with this is that since collect_list has no partial aggregation we would end up shuffling all the data anyway. So while it would mean the actual resulting column might be smaller, the whole process would be as expensive as collect_list.
So I thought of adding partial aggregation. The problem is that the merge function receives a buffer which is in a row format. Collect_list doesn't use the buffer and uses its own data structure for collecting the data.
I can change the implementation to use a spark ArrayType instead, however, since ArrayType is immutable it would mean that I would need to copy it whenever I do anything.
Consider the simplest implementation of the update function:
If there are few elements => add an element to the array (if I use regular Array this would mean copy as I grow it which is fine for this stage)
If there are enough elements => we do not grow the array. Instead we need to decide what to replace. If we want to have the top 10 for example and there are 10 elements, we need to drop the lowest and put the new one.
This means that if we simply loop across the array we would create a new copy and pay the copy + loop. If we keep it sorted then adding, sorting and removing the low one means 3 copies.
If I would have been able to use scala's array then I would basically copy whenever I grow and then when we grown to the max, all I would need to do is REPLACE the relevant element which is much cheaper.

The only other solution I see is to simply provide "take first N" agg function and have the user sort beforehand but this seems a bad solution to me both because sort is expensive and because if we do multiple aggregations we can't sort in two different ways.


I can't find a way to convert an internal buffer the way collect_list does it to an internal buffer before the merge.
I also can't find any way to use an array in the internal buffer as a mutable array. If I look at GenericMutableRow implementation then updating an array means creating a new one. I thought maybe of adding a function update_array_element which would change the relevant element (and similarly get_array_element to get an array element) which would allow to easily make the array mutable but if I look at the documentation it states this is not allowed.

Can anyone give me a tip on where to try to go from here?




--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/https-issues-apache-org-jira-browse-SPARK-17691-tp19107.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re: https://issues.apache.org/jira/browse/SPARK-17691

Posted by Herman van Hövell tot Westerflier <hv...@databricks.com>.
Hi Asaf,

The current collect_list/collect_set implementations have room for
improvement. We did not implement partial aggregation for these, because
the idea of a partial aggregation is that we can reduce network traffic (by
shipping fewer partially aggregated buffers); this does not really apply to
a collect_list where the typical use case is to change the shape of the
data.

I think you have two simple options here:

   1. In the latest branch we added a TypedImperativeAggregate. This allows
   you to use any object as an aggregation buffer. You will need to do some
   serialization though. The ApproximatePercentile aggregate function uses
   this technique.
   2. Exploit the fact that you want collect a limited amount of elements.
   You can use a row a as the buffer. This is much easier to work with. See
   HyperLogLogPlusPlus for an example of this.


HTH
-Herman

On Tue, Sep 27, 2016 at 7:02 AM, assaf.mendelson <as...@rsa.com>
wrote:

> Hi,
>
>
>
> I wanted to try to implement https://issues.apache.org/
> jira/browse/SPARK-17691.
>
> So I started by looking at the implementation of collect_list. My idea
> was, do the same as they but when adding a new element, if there are
> already more than the threshold, remove one instead.
>
> The problem with this is that since collect_list has no partial
> aggregation we would end up shuffling all the data anyway. So while it
> would mean the actual resulting column might be smaller, the whole process
> would be as expensive as collect_list.
>
> So I thought of adding partial aggregation. The problem is that the merge
> function receives a buffer which is in a row format. Collect_list doesn’t
> use the buffer and uses its own data structure for collecting the data.
>
> I can change the implementation to use a spark ArrayType instead, however,
> since ArrayType is immutable it would mean that I would need to copy it
> whenever I do anything.
>
> Consider the simplest implementation of the update function:
>
> If there are few elements => add an element to the array (if I use regular
> Array this would mean copy as I grow it which is fine for this stage)
>
> If there are enough elements => we do not grow the array. Instead we need
> to decide what to replace. If we want to have the top 10 for example and
> there are 10 elements, we need to drop the lowest and put the new one.
>
> This means that if we simply loop across the array we would create a new
> copy and pay the copy + loop. If we keep it sorted then adding, sorting and
> removing the low one means 3 copies.
>
> If I would have been able to use scala’s array then I would basically copy
> whenever I grow and then when we grown to the max, all I would need to do
> is REPLACE the relevant element which is much cheaper.
>
>
>
> The only other solution I see is to simply provide “take first N” agg
> function and have the user sort beforehand but this seems a bad solution to
> me both because sort is expensive and because if we do multiple
> aggregations we can’t sort in two different ways.
>
>
>
>
>
> I can’t find a way to convert an internal buffer the way collect_list does
> it to an internal buffer before the merge.
>
> I also can’t find any way to use an array in the internal buffer as a
> mutable array. If I look at GenericMutableRow implementation then updating
> an array means creating a new one. I thought maybe of adding a function
> update_array_element which would change the relevant element (and similarly
> get_array_element to get an array element) which would allow to easily make
> the array mutable but if I look at the documentation it states this is not
> allowed.
>
>
>
> Can anyone give me a tip on where to try to go from here?
>
> ------------------------------
> View this message in context: https://issues.apache.org/
> jira/browse/SPARK-17691
> <http://apache-spark-developers-list.1001551.n3.nabble.com/https-issues-apache-org-jira-browse-SPARK-17691-tp19107.html>
> Sent from the Apache Spark Developers List mailing list archive
> <http://apache-spark-developers-list.1001551.n3.nabble.com/> at
> Nabble.com.
>