You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Martin Loncaric (Jira)" <ji...@apache.org> on 2020/07/14 03:39:00 UTC

[jira] [Commented] (SPARK-31356) Splitting Aggregate node into separate Aggregate and Serialize for Optimizer

    [ https://issues.apache.org/jira/browse/SPARK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17157128#comment-17157128 ] 

Martin Loncaric commented on SPARK-31356:
-----------------------------------------

Actually, there seem to be 3 separate performance issues:
1. unnecessary appendColumns when groupByKey function just returns a subset of columns (though this is hard to get around in a type safe way)
2. unnecessary serialize + deserialize
3. actually the RDD's API is roughly a whole 2x faster. It seems there's a lot of room to improve aggregations

> Splitting Aggregate node into separate Aggregate and Serialize for Optimizer
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-31356
>                 URL: https://issues.apache.org/jira/browse/SPARK-31356
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Martin Loncaric
>            Priority: Major
>
> Problem: in Datasets API, it is a very common pattern to do something like this whenever a complex reduce function is needed:
> {code:scala}
> ds
>   .groupByKey(_.y)
>   .reduceGroups((a, b) => {...})
>   .map(_._2)
> {code}
> However, the .map(_._2) step (taking values and throwing keys away) unfortunately often ends up as an unnecessary serialization during aggregation step, followed by {{DeserializeToObject + MapElements (from (K, V) => V) + SerializeFromObject}} in the optimized logical plan. In this example, it would be more ideal to either skip the deserialization/serialization or {{Project (from (K, V) => V)}}. Even manually doing a {{.select(...).as[T]}} to replace the `.map` is quite tricky, because
> * the columns are complicated, like {{[value, ReduceAggregator(my.data.type)]}}, and seem to be impossible to {{.select}}
> * it breaks the nice type checking of Datasets
> Proposal:
> Change the {{KeyValueGroupedDataset.aggUntyped}} method to (like {{KeyValueGroupedDataset.cogroup}}) append add both an {{Aggregate node}} and a {{SerializeFromObject}} node so that the Optimizer can eliminate the serialization when it is redundant. Change aggregations to emit deserialized results.
> I had 2 ideas for what we could change: either add a new feature to {{.reduceGroupValues}} that projects to only the necessary columns, or do this improvement. I thought this would be a better solution because
> * it will improve the performance of existing Spark applications with no modifications
> * feature growth is undesirable
> Uncertainties:
> Affects Version: I'm not sure - if I submit a PR soon, can we get this into 3.0? Or only 3.1? And I assume we're not adding new features to 2.4?
> Complications: Are there any hazards in splitting Aggregation into Aggregation + SerializeFromObject that I'm not aware of?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org