You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Pablo Langa Blanco (Jira)" <ji...@apache.org> on 2021/03/30 23:04:00 UTC

[jira] [Commented] (SPARK-34464) `first` function is sorting the dataset while sometimes it is used to get "any value"

    [ https://issues.apache.org/jira/browse/SPARK-34464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17311845#comment-17311845 ] 

Pablo Langa Blanco commented on SPARK-34464:
--------------------------------------------

Hi [~lfruleux],

Here it's a link that explain very good the reasons when the different types of aggregations are applied. 

[https://www.waitingforcode.com/apache-spark-sql/aggregations-execution-apache-spark-sql/read]

In the case you expose there are two things that make the aggregation fallback in a SortAggregate. The first is that the types of the aggregation are not primitive mutable types (necessary for HashAggregate). The first fallback is ObjectHashAggregate, but in this case first function is not supported by ObjectHashAggregate because it's not a TypedImperativeAggregate, so it fallback to SorteAggregate.

I don't know if this has any reason, I'm going to take a look if it's possible to TypedImperativeAggregate to fallback to ObjectHashAggregate.

Thanks!

> `first` function is sorting the dataset while sometimes it is used to get "any value"
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-34464
>                 URL: https://issues.apache.org/jira/browse/SPARK-34464
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Louis Fruleux
>            Priority: Minor
>
> When one wants to groupBy and take any value (not necessarily the first), one usually uses [first|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L485] aggregation function.
> Unfortunately, this method uses a `SortAggregate` for some data types, which is not always necessary and might impact performances. Is this the desired behavior?
>  
>  
> {code:java}
> Current behavior:
>  val df = Seq((0, "value")).toDF("key", "value")
> df.groupBy("key").agg(first("value")).explain()
>  /*
>  == Physical Plan ==
>  SortAggregate(key=key#342, functions=first(value#343, false))
>  +- *(2) Sort key#342 ASC NULLS FIRST, false, 0
>     +- Exchange hashpartitioning(key#342, 200)
>        +- SortAggregate(key=key#342, functions=partial_first(value#343, false))
>           +- *(1) Sort key#342 ASC NULLS FIRST, false, 0
>              +- LocalTableScan key#342, value#343
>  */
> {code}
>  
> My understanding of the source code does not allow me to fully understand why this is the current behavior.
> The solution might be to implement a new aggregate function. But the code would be highly similar to the first one. And if I don't fully understand why this [createAggregate|https://github.com/apache/spark/blob/3a299aa6480ac22501512cd0310d31a441d7dfdc/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala#L45] method falls back to SortAggregate.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org