You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jorge (Jira)" <ji...@apache.org> on 2020/07/04 06:22:00 UTC
[jira] [Created] (SPARK-32169) Allow filter pushdown after a
groupBy with collect_list
Jorge created SPARK-32169:
-----------------------------
Summary: Allow filter pushdown after a groupBy with collect_list
Key: SPARK-32169
URL: https://issues.apache.org/jira/browse/SPARK-32169
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 3.0.0, 2.4.6
Reporter: Jorge
Consider the following statement:
```python
import pyspark
import pyspark.sql.functions as f
with pyspark.SparkContext(conf=pyspark.SparkConf().setMaster('local[*]')) as sc:
spark = pyspark.sql.SQLContext(sc)
df = spark.createDataFrame([
[2020, 1, 1, 1.0],
[2020, 1, 2, 2.0],
[2020, 1, 3, 3.0],
], schema=['year', 'id', 't', 'value'])
df = df.groupBy(['year', 'id']).agg(f.sum('value'))
df = df.where(f.col('year') == 2020)
df.explain()
```
which yields the plan
```
== Physical Plan ==
*(2) Filter (isnotnull(year#0L) AND (year#0L = 2020))
+- ObjectHashAggregate(keys=[year#0L, id#1L], functions=[collect_list(value#3, 0, 0)])
+- Exchange hashpartitioning(year#0L, id#1L, 200), true, [id=#23]
+- ObjectHashAggregate(keys=[year#0L, id#1L], functions=[partial_collect_list(value#3, 0, 0)])
+- *(1) Project [year#0L, id#1L, value#3]
+- *(1) Scan ExistingRDD[year#0L,id#1L,t#2L,value#3]
```
This issue is about allowing the `Filter` to be placed before `hashpartitioning` (in the plan), since there seems to be no reason to not allow so.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org