You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Martin Loncaric (Jira)" <ji...@apache.org> on 2020/07/19 01:08:00 UTC

[jira] [Comment Edited] (SPARK-31356) reduceGroupValues function for KeyValueGroupedDataset

    [ https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157128#comment-17157128 ] 

Martin Loncaric edited comment on SPARK-31356 at 7/19/20, 1:07 AM:
-------------------------------------------------------------------

Actually, there seem to be 4 separate performance issues:
1. unnecessary serialize + deserialize
2. unnecessary map
3. unnecessary appendColumns in the case when the groupByKey function just returns a subset of columns (though this is hard to get around in a type safe way)
4. actually the RDD's API is roughly a whole 2x faster. There might be even more room to improve aggregations


was (Author: mwlon):
Actually, there seem to be 4 separate performance issues:
1. unnecessary serialize + deserialize
2. unnecessary map
3. unnecessary appendColumns when groupByKey function just returns a subset of columns (though this is hard to get around in a type safe way)
4. actually the RDD's API is roughly a whole 2x faster. There might be even more room to improve aggregations

> reduceGroupValues function for KeyValueGroupedDataset
> -----------------------------------------------------
>
>                 Key: SPARK-31356
>                 URL: https://issues.apache.org/jira/browse/SPARK-31356
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Martin Loncaric
>            Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroups((a, b) => {...})
>   .map(_._2)
> {code}
> However, the the optimized plan unfortunately ends up with an unnecessary implicit serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V)}}. For this common use case, there are 2 things we can improve with a specialized {{.reduceGroupValues}}:
> 1. avoid the extra serialization (baked in to AggregationIterator implementations) and deserialization
> 2. avoid requiring a {{.map}} and the time it takes to unpack tuples by emitting the values only
> Proposal:
> Create an {{AggregationIteratorBase}} superclass that can emit general {{InternalRow}}s instead of just {{UnsafeRow}}s.
> Create a new {{AggregationIteratorBase}} implementation called {{ObjectValuesAggregationIterator}} that emits {{InternalRow}}s containing only the values instead of serializing them into {{UnsafeRow}}s on {{Final}} or {{Complete}} modes. Since we don't need to emit the keys, which are serialized, this is not too complicated. To make use of this, have the {{.reduceGroupValues}} return an {{Aggregate}} wrapped in a {{CatalystSerde.serialize}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org