You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Takeshi Yamamuro (JIRA)" <ji...@apache.org> on 2017/03/12 07:01:04 UTC

[jira] [Closed] (SPARK-7629) Unroll AggregateFunction "update" in the case of a single input expression

     [ https://issues.apache.org/jira/browse/SPARK-7629?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Takeshi Yamamuro closed SPARK-7629.
-----------------------------------
    Resolution: Won't Fix

> Unroll AggregateFunction "update" in the case of a single input expression
> --------------------------------------------------------------------------
>
>                 Key: SPARK-7629
>                 URL: https://issues.apache.org/jira/browse/SPARK-7629
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 1.2.1
>            Reporter: Max Seiden
>            Priority: Minor
>
> h3. Summary
> This was observed on 1.2.1. Many of the AggregateFunctions take a list of Expressions as input, wrap them up in an InterpretedProjection, and evaluate the InterpretedProjection to get the arguments for the actual aggregate expression. In the case where there is only a single input expression however, this leads to gross inefficiencies. 
> Take CountDistinctFunction as an example, and assume it has a single input expression of type String. In this case spark uses an OpenHashSet[Any] to collect a distinct set of GenericRow(Array[Any](string)). This is hugely inefficient, since every String object must be wrapped up in first an Array and second a GenericRow, and then inserted into the OpenHashSet where all hashing and equality comparisons happen on the GenericRow. 
> This means that any single OpenHashSet entry has unnecessary overhead from the GenericRow and Array[Any], and all hashcode and equality operations must go through the Row. In the case of hashcode, this means that every invocation requires a while loop and pattern match. In the case of equality, Seq[Any].equals is used, which requires (for both the candidate and existing objects) a pattern match, call to "canEqual", and call to "sameElements" - the last of these (in IterableLike as far as I can tell) constructs a Scala Iterator over the Array[Any] and does element-by-element comparisons. 
> Needless to say, this requires far too many cycles in the case of a single input Expression, has a high and unnecessary memory overhead, and generates a ton of garbage. To give a concrete example, I am unable to compute a grand distinct on an input of 15M unique IP addresses in a 2GB JVM. After a few seconds of running, I start to get substantial GCs, and eventually get into a state where the JVM is stuck doing full GCs. Additionally a profile of the executor thread shows that 85% of the time is spent rehashing (perhaps explainable at that scale), but that 65% of the time is spent in GenericRow.hashCode. 
> My proposed improvement is to unroll those aggregate functions in the case of a single input expression so that the inefficiencies described above can be avoided altogether. The only tricky bits here are dealing with NULLs (Row does that for us) and efficiently handling both the single input and multi-input cases within the same aggregate function impl.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org