You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "L. C. Hsieh (Jira)" <ji...@apache.org> on 2020/07/04 06:56:00 UTC

[jira] [Commented] (SPARK-32169) Allow filter pushdown after a groupBy with collect_list

    [ https://issues.apache.org/jira/browse/SPARK-32169?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17151196#comment-17151196 ] 

L. C. Hsieh commented on SPARK-32169:
-------------------------------------

collect_list is non-deterministic expression. The optimizer in SparkSQL cannot push down predicate through such expressions.

> Allow filter pushdown after a groupBy with collect_list
> -------------------------------------------------------
>
>                 Key: SPARK-32169
>                 URL: https://issues.apache.org/jira/browse/SPARK-32169
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.4.6, 3.0.0
>            Reporter: Jorge
>            Priority: Major
>              Labels: performance
>
> Consider the following statement:
> {code:python}
> import pyspark
> import pyspark.sql.functions as f
> with pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]')) as sc:
>     spark = pyspark.sql.SQLContext(sc)
>     df = spark.createDataFrame([
>         [2020, 1, 1, 1.0],
>         [2020, 1, 2, 2.0],
>         [2020, 1, 3, 3.0],
>     ], schema=['year', 'id', 't', 'value'])
>     df = df.groupBy(['year', 'id']).agg(f.collect_list('value'))
>     df = df.where(f.col('year') == 2020)
>     df.explain()
> {code}
>  
> which yields the plan
> {code:bash}
> == Physical Plan ==
> *(2) Filter (isnotnull(year#0L) AND (year#0L = 2020))
> +- ObjectHashAggregate(keys=[year#0L, id#1L], functions=[collect_list(value#3, 0, 0)])
>    +- Exchange hashpartitioning(year#0L, id#1L, 200), true, [id=#23]
>       +- ObjectHashAggregate(keys=[year#0L, id#1L], functions=[partial_collect_list(value#3, 0, 0)])
>          +- *(1) Project [year#0L, id#1L, value#3]
>             +- *(1) Scan ExistingRDD[year#0L,id#1L,t#2L,value#3]
> {code}
> This issue is about allowing the `Filter` to be placed before `hashpartitioning` (in the plan), since there seems to be no reason to not allow so.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org