You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jorge (Jira)" <ji...@apache.org> on 2020/07/04 06:23:00 UTC

[jira] [Updated] (SPARK-32169) Allow filter pushdown after a groupBy with collect_list

     [ https://issues.apache.org/jira/browse/SPARK-32169?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jorge updated SPARK-32169:
--------------------------
    Description: 
Consider the following statement:


{code:python}
import pyspark
import pyspark.sql.functions as f


with pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]')) as sc:
    spark = pyspark.sql.SQLContext(sc)

    df = spark.createDataFrame([
        [2020, 1, 1, 1.0],
        [2020, 1, 2, 2.0],
        [2020, 1, 3, 3.0],
    ], schema=['year', 'id', 't', 'value'])

    df = df.groupBy(['year', 'id']).agg(f.sum('value'))
    df = df.where(f.col('year') == 2020)
    df.explain()
{code}
 

which yields the plan

{code:bash}
== Physical Plan ==
*(2) Filter (isnotnull(year#0L) AND (year#0L = 2020))
+- ObjectHashAggregate(keys=[year#0L, id#1L], functions=[collect_list(value#3, 0, 0)])
   +- Exchange hashpartitioning(year#0L, id#1L, 200), true, [id=#23]
      +- ObjectHashAggregate(keys=[year#0L, id#1L], functions=[partial_collect_list(value#3, 0, 0)])
         +- *(1) Project [year#0L, id#1L, value#3]
            +- *(1) Scan ExistingRDD[year#0L,id#1L,t#2L,value#3]
{code}

This issue is about allowing the `Filter` to be placed before `hashpartitioning` (in the plan), since there seems to be no reason to not allow so.

  was:
Consider the following statement:

 

```python
import pyspark
import pyspark.sql.functions as f


with pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]')) as sc:
    spark = pyspark.sql.SQLContext(sc)

    df = spark.createDataFrame([
        [2020, 1, 1, 1.0],
        [2020, 1, 2, 2.0],
        [2020, 1, 3, 3.0],
    ], schema=['year', 'id', 't', 'value'])

    df = df.groupBy(['year', 'id']).agg(f.sum('value'))
    df = df.where(f.col('year') == 2020)
    df.explain()
```

which yields the plan

```
== Physical Plan ==
*(2) Filter (isnotnull(year#0L) AND (year#0L = 2020))
+- ObjectHashAggregate(keys=[year#0L, id#1L], functions=[collect_list(value#3, 0, 0)])
   +- Exchange hashpartitioning(year#0L, id#1L, 200), true, [id=#23]
      +- ObjectHashAggregate(keys=[year#0L, id#1L], functions=[partial_collect_list(value#3, 0, 0)])
         +- *(1) Project [year#0L, id#1L, value#3]
            +- *(1) Scan ExistingRDD[year#0L,id#1L,t#2L,value#3]
```

This issue is about allowing the `Filter` to be placed before `hashpartitioning` (in the plan), since there seems to be no reason to not allow so.


> Allow filter pushdown after a groupBy with collect_list
> -------------------------------------------------------
>
>                 Key: SPARK-32169
>                 URL: https://issues.apache.org/jira/browse/SPARK-32169
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.4.6, 3.0.0
>            Reporter: Jorge
>            Priority: Major
>              Labels: performance
>
> Consider the following statement:
> {code:python}
> import pyspark
> import pyspark.sql.functions as f
> with pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]')) as sc:
>     spark = pyspark.sql.SQLContext(sc)
>     df = spark.createDataFrame([
>         [2020, 1, 1, 1.0],
>         [2020, 1, 2, 2.0],
>         [2020, 1, 3, 3.0],
>     ], schema=['year', 'id', 't', 'value'])
>     df = df.groupBy(['year', 'id']).agg(f.sum('value'))
>     df = df.where(f.col('year') == 2020)
>     df.explain()
> {code}
>  
> which yields the plan
> {code:bash}
> == Physical Plan ==
> *(2) Filter (isnotnull(year#0L) AND (year#0L = 2020))
> +- ObjectHashAggregate(keys=[year#0L, id#1L], functions=[collect_list(value#3, 0, 0)])
>    +- Exchange hashpartitioning(year#0L, id#1L, 200), true, [id=#23]
>       +- ObjectHashAggregate(keys=[year#0L, id#1L], functions=[partial_collect_list(value#3, 0, 0)])
>          +- *(1) Project [year#0L, id#1L, value#3]
>             +- *(1) Scan ExistingRDD[year#0L,id#1L,t#2L,value#3]
> {code}
> This issue is about allowing the `Filter` to be placed before `hashpartitioning` (in the plan), since there seems to be no reason to not allow so.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org