You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (JIRA)" <ji...@apache.org> on 2019/08/05 05:19:00 UTC
[jira] [Updated] (SPARK-28422) GROUPED_AGG pandas_udf doesn't with
spark.sql() without group by clause
[ https://issues.apache.org/jira/browse/SPARK-28422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun updated SPARK-28422:
----------------------------------
Description:
{code:java}
@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def max_udf(v):
return v.max()
df = spark.range(0, 100)
spark.udf.register('max_udf', max_udf)
df.createTempView('table')
# A. This works
df.agg(max_udf(df['id'])).show()
# B. This doesn't work
spark.sql("select max_udf(id) from table").show(){code}
Query plan:
A:
{code:java}
== Parsed Logical Plan ==
'Aggregate [max_udf('id) AS max_udf(id)#140]
+- Range (0, 1000, step=1, splits=Some(4))
== Analyzed Logical Plan ==
max_udf(id): double
Aggregate [max_udf(id#64L) AS max_udf(id)#140]
+- Range (0, 1000, step=1, splits=Some(4))
== Optimized Logical Plan ==
Aggregate [max_udf(id#64L) AS max_udf(id)#140]
+- Range (0, 1000, step=1, splits=Some(4))
== Physical Plan ==
!AggregateInPandas [max_udf(id#64L)], [max_udf(id)#138 AS max_udf(id)#140]
+- Exchange SinglePartition
+- *(1) Range (0, 1000, step=1, splits=4)
{code}
B:
{code:java}
== Parsed Logical Plan ==
'Project [unresolvedalias('max_udf('id), None)]
+- 'UnresolvedRelation [table]
== Analyzed Logical Plan ==
max_udf(id): double
Project [max_udf(id#0L) AS max_udf(id)#136]
+- SubqueryAlias `table`
+- Range (0, 100, step=1, splits=Some(4))
== Optimized Logical Plan ==
Project [max_udf(id#0L) AS max_udf(id)#136]
+- Range (0, 100, step=1, splits=Some(4))
== Physical Plan ==
*(1) Project [max_udf(id#0L) AS max_udf(id)#136]
+- *(1) Range (0, 100, step=1, splits=4)
{code}
was:
{code:java}
@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def max_udf(v):
return v.max()
df = spark.range(0, 100)
df.udf.register('max_udf', max_udf)
df.createTempView('table')
# A. This works
df.agg(max_udf(df['id'])).show()
# B. This doesn't work
spark.sql("select max_udf(id) from table").show(){code}
Query plan:
A:
{code:java}
== Parsed Logical Plan ==
'Aggregate [max_udf('id) AS max_udf(id)#140]
+- Range (0, 1000, step=1, splits=Some(4))
== Analyzed Logical Plan ==
max_udf(id): double
Aggregate [max_udf(id#64L) AS max_udf(id)#140]
+- Range (0, 1000, step=1, splits=Some(4))
== Optimized Logical Plan ==
Aggregate [max_udf(id#64L) AS max_udf(id)#140]
+- Range (0, 1000, step=1, splits=Some(4))
== Physical Plan ==
!AggregateInPandas [max_udf(id#64L)], [max_udf(id)#138 AS max_udf(id)#140]
+- Exchange SinglePartition
+- *(1) Range (0, 1000, step=1, splits=4)
{code}
B:
{code:java}
== Parsed Logical Plan ==
'Project [unresolvedalias('max_udf('id), None)]
+- 'UnresolvedRelation [table]
== Analyzed Logical Plan ==
max_udf(id): double
Project [max_udf(id#0L) AS max_udf(id)#136]
+- SubqueryAlias `table`
+- Range (0, 100, step=1, splits=Some(4))
== Optimized Logical Plan ==
Project [max_udf(id#0L) AS max_udf(id)#136]
+- Range (0, 100, step=1, splits=Some(4))
== Physical Plan ==
*(1) Project [max_udf(id#0L) AS max_udf(id)#136]
+- *(1) Range (0, 100, step=1, splits=4)
{code}
> GROUPED_AGG pandas_udf doesn't with spark.sql() without group by clause
> -----------------------------------------------------------------------
>
> Key: SPARK-28422
> URL: https://issues.apache.org/jira/browse/SPARK-28422
> Project: Spark
> Issue Type: Bug
> Components: PySpark, SQL
> Affects Versions: 2.4.3
> Reporter: Li Jin
> Priority: Major
>
>
> {code:java}
> @pandas_udf('double', PandasUDFType.GROUPED_AGG)
> def max_udf(v):
> return v.max()
> df = spark.range(0, 100)
> spark.udf.register('max_udf', max_udf)
> df.createTempView('table')
> # A. This works
> df.agg(max_udf(df['id'])).show()
> # B. This doesn't work
> spark.sql("select max_udf(id) from table").show(){code}
>
>
> Query plan:
> A:
> {code:java}
> == Parsed Logical Plan ==
> 'Aggregate [max_udf('id) AS max_udf(id)#140]
> +- Range (0, 1000, step=1, splits=Some(4))
> == Analyzed Logical Plan ==
> max_udf(id): double
> Aggregate [max_udf(id#64L) AS max_udf(id)#140]
> +- Range (0, 1000, step=1, splits=Some(4))
> == Optimized Logical Plan ==
> Aggregate [max_udf(id#64L) AS max_udf(id)#140]
> +- Range (0, 1000, step=1, splits=Some(4))
> == Physical Plan ==
> !AggregateInPandas [max_udf(id#64L)], [max_udf(id)#138 AS max_udf(id)#140]
> +- Exchange SinglePartition
> +- *(1) Range (0, 1000, step=1, splits=4)
> {code}
> B:
> {code:java}
> == Parsed Logical Plan ==
> 'Project [unresolvedalias('max_udf('id), None)]
> +- 'UnresolvedRelation [table]
> == Analyzed Logical Plan ==
> max_udf(id): double
> Project [max_udf(id#0L) AS max_udf(id)#136]
> +- SubqueryAlias `table`
> +- Range (0, 100, step=1, splits=Some(4))
> == Optimized Logical Plan ==
> Project [max_udf(id#0L) AS max_udf(id)#136]
> +- Range (0, 100, step=1, splits=Some(4))
> == Physical Plan ==
> *(1) Project [max_udf(id#0L) AS max_udf(id)#136]
> +- *(1) Range (0, 100, step=1, splits=4)
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.14#76016)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org