You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by "assaf.mendelson" <as...@rsa.com> on 2016/09/18 16:06:33 UTC

Memory usage for spark types

Hi,
I am trying to understand how spark types are kept in memory and accessed.
I tried to look at the code at the definition of MapType and ArrayType for example and I can't seem to find the relevant code for its actual implementation.

I am trying to figure out how these two types are implemented to understand how they match my needs.
In general, it appears the size of a map is the same as two arrays which is about double the naïve array implementation: if I have 1000 rows, each with a map from 10K integers to 10K integers, I find through caching the dataframe that the total is is ~150MB (the naïve implementation of two arrays would code 1000*10000*(4+4) or a total of ~80MB). I see the same size if I use two arrays. Second, what would be the performance of updating the map/arrays as they are immutable (i.e. some copying is required).

The reason I am asking this is because I wanted to do an aggregate function which calculates a variation of a histogram.
The most naïve solution for this would be to have a map from the bin to the count. But since we are talking about an immutable map, wouldn't that cost a lot more?
An even further optimization would be to use a mutable array where we combine the key and value to a single value (key and value are both int in my case). Assuming the maximum number of bins is small (e.g. less than 10), it is often cheaper to just search the array for the right key (and in this case the size of the data is expected to be significantly smaller than map). In my case, most of the type (90%) there are less than 3 elements in the bin and If I have more than 10 bins I basically do a combination to reduce the number.

For few elements, a map becomes very inefficient  - If I create 10M rows with 1 map from int to int each I get an overall of ~380MB meaning ~38 bytes per element (instead of just 8). For array, again it is too large (229MB, i.e. ~23 bytes per element).

Is there a way to implement a simple mutable array type to use in the aggregation buffer? Where is the portion of the code that handles the actual type handling?
Thanks,
                Assaf.




--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Memory-usage-for-spark-types-tp18984.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

RE: Memory usage for spark types

Posted by "assaf.mendelson" <as...@rsa.com>.
Thanks for the pointer.

I have been reading the code and trying to understand how to create an efficient aggregate function but I must be missing something because it seems to me that creating any kind of aggregation function which uses non primitive types would have a high overhead.
Consider the following simple example: We have a column which contains the numbers 1-10. We want to calculate a histogram for these values.
In an equivalent to the hand written code in https://databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html The trivial solution (a student solution) would look something like this:
var hist = new int[10]
for (v in col) {
  hist[v] += 1
}

The problem is that as far as I understand, spark wouldn’t create it this way.
Instead I would need to do something like “update hist in position v by +1” which in practice would mean the array will be copied at least 3 times:
First it will be copied from its unsafe implementation to a scala sequence (even worse, since arrays always use offsets, the copying would have to be done element by element instead of a single memcopy), then since the array is immutable, we will have to create a new version of it (by copying and changing just the relevant element) and then we copy it back to the unsafe version.

I tried to look at examples in the code which have an intermediate buffer which is not a simple structure. Basically, I see two such types of examples: distinct operations (which, if I understand correctly, somehow internally has a hashmap to contain the distinct values but I can’t find the code which generates it) and collect functions (collect_list, collect_set) which do not appear to do any code generation BUT define their own buffer as they will (the buffer is NOT of a regular type).


So I was wondering, what is the right way to implement an efficient logic as above.
I see two options:

1.       Using UDAF – In this case I would define the buffer to have 10 integer fields and manipulate each. This solution suffers from two problems: First it is slow (especially if there are other aggregations which are using spark sql expressions) and second it is limited (I can’t change the size of the array in the middle. For example, assuming the above histogram is made on a groupby and I know beforehand that in 99% of the cases there are 3 values but in 1% of the cases there are 100 values. If I would have used an array I would just convert to a bigger array the first time I see a value from the 100)

2.       Implement similar to collect_list and collect_set. If I look at the documentation for collect class, this uses the slower sort based aggregation path because the number of elmenets can not be determined in advance even though in the basic case above, we do know the size. (although I am not sure how its performance would compare to the UDAF option). This appears to be simpler than UDAF because I can use the data types I want directly, however I can’t figure out how the code generation is done as I do not see the relevant functions when doing debugCodegen on the result
I also believe there should be a third option by actually implementing the proper expression, but I have no idea how to do that.


Can anyone point me in the right direction?


From: rxin [via Apache Spark Developers List] [mailto:ml-node+s1001551n18985h24@n3.nabble.com]
Sent: Monday, September 19, 2016 12:23 AM
To: Mendelson, Assaf
Subject: Re: Memory usage for spark types

Take a look at UnsafeArrayData and UnsafeMapData.


On Sun, Sep 18, 2016 at 9:06 AM, assaf.mendelson <[hidden email]</user/SendEmail.jtp?type=node&node=18985&i=0>> wrote:
Hi,
I am trying to understand how spark types are kept in memory and accessed.
I tried to look at the code at the definition of MapType and ArrayType for example and I can’t seem to find the relevant code for its actual implementation.

I am trying to figure out how these two types are implemented to understand how they match my needs.
In general, it appears the size of a map is the same as two arrays which is about double the naïve array implementation: if I have 1000 rows, each with a map from 10K integers to 10K integers, I find through caching the dataframe that the total is is ~150MB (the naïve implementation of two arrays would code 1000*10000*(4+4) or a total of ~80MB). I see the same size if I use two arrays. Second, what would be the performance of updating the map/arrays as they are immutable (i.e. some copying is required).

The reason I am asking this is because I wanted to do an aggregate function which calculates a variation of a histogram.
The most naïve solution for this would be to have a map from the bin to the count. But since we are talking about an immutable map, wouldn’t that cost a lot more?
An even further optimization would be to use a mutable array where we combine the key and value to a single value (key and value are both int in my case). Assuming the maximum number of bins is small (e.g. less than 10), it is often cheaper to just search the array for the right key (and in this case the size of the data is expected to be significantly smaller than map). In my case, most of the type (90%) there are less than 3 elements in the bin and If I have more than 10 bins I basically do a combination to reduce the number.

For few elements, a map becomes very inefficient  - If I create 10M rows with 1 map from int to int each I get an overall of ~380MB meaning ~38 bytes per element (instead of just 8). For array, again it is too large (229MB, i.e. ~23 bytes per element).

Is there a way to implement a simple mutable array type to use in the aggregation buffer? Where is the portion of the code that handles the actual type handling?
Thanks,
                Assaf.

________________________________
View this message in context: Memory usage for spark types<http://apache-spark-developers-list.1001551.n3.nabble.com/Memory-usage-for-spark-types-tp18984.html>
Sent from the Apache Spark Developers List mailing list archive<http://apache-spark-developers-list.1001551.n3.nabble.com/> at Nabble.com.


________________________________
If you reply to this email, your message will be added to the discussion below:
http://apache-spark-developers-list.1001551.n3.nabble.com/Memory-usage-for-spark-types-tp18984p18985.html
To start a new topic under Apache Spark Developers List, email ml-node+s1001551n1h20@n3.nabble.com<ma...@n3.nabble.com>
To unsubscribe from Apache Spark Developers List, click here<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=1&code=YXNzYWYubWVuZGVsc29uQHJzYS5jb218MXwtMTI4OTkxNTg1Mg==>.
NAML<http://apache-spark-developers-list.1001551.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Memory-usage-for-spark-types-tp18984p18990.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Re: Memory usage for spark types

Posted by Reynold Xin <rx...@databricks.com>.
Take a look at UnsafeArrayData and UnsafeMapData.


On Sun, Sep 18, 2016 at 9:06 AM, assaf.mendelson <as...@rsa.com>
wrote:

> Hi,
>
> I am trying to understand how spark types are kept in memory and accessed.
>
> I tried to look at the code at the definition of MapType and ArrayType for
> example and I can’t seem to find the relevant code for its actual
> implementation.
>
>
>
> I am trying to figure out how these two types are implemented to
> understand how they match my needs.
>
> In general, it appears the size of a map is the same as two arrays which
> is about double the naïve array implementation: if I have 1000 rows, each
> with a map from 10K integers to 10K integers, I find through caching the
> dataframe that the total is is ~150MB (the naïve implementation of two
> arrays would code 1000*10000*(4+4) or a total of ~80MB). I see the same
> size if I use two arrays. Second, what would be the performance of updating
> the map/arrays as they are immutable (i.e. some copying is required).
>
>
>
> The reason I am asking this is because I wanted to do an aggregate
> function which calculates a variation of a histogram.
>
> The most naïve solution for this would be to have a map from the bin to
> the count. But since we are talking about an immutable map, wouldn’t that
> cost a lot more?
>
> An even further optimization would be to use a mutable array where we
> combine the key and value to a single value (key and value are both int in
> my case). Assuming the maximum number of bins is small (e.g. less than 10),
> it is often cheaper to just search the array for the right key (and in this
> case the size of the data is expected to be significantly smaller than
> map). In my case, most of the type (90%) there are less than 3 elements in
> the bin and If I have more than 10 bins I basically do a combination to
> reduce the number.
>
>
>
> For few elements, a map becomes very inefficient  - If I create 10M rows
> with 1 map from int to int each I get an overall of ~380MB meaning ~38
> bytes per element (instead of just 8). For array, again it is too large
> (229MB, i.e. ~23 bytes per element).
>
>
>
> Is there a way to implement a simple mutable array type to use in the
> aggregation buffer? Where is the portion of the code that handles the
> actual type handling?
>
> Thanks,
>
>                 Assaf.
>
> ------------------------------
> View this message in context: Memory usage for spark types
> <http://apache-spark-developers-list.1001551.n3.nabble.com/Memory-usage-for-spark-types-tp18984.html>
> Sent from the Apache Spark Developers List mailing list archive
> <http://apache-spark-developers-list.1001551.n3.nabble.com/> at
> Nabble.com.
>