You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Li Jin (JIRA)" <ji...@apache.org> on 2019/06/13 15:37:00 UTC

[jira] [Comment Edited] (SPARK-28006) User-defined grouped transform pandas_udf for window operations

    [ https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16863186#comment-16863186 ] 

Li Jin edited comment on SPARK-28006 at 6/13/19 3:36 PM:
---------------------------------------------------------

Hi [~viirya] good questions!

>> Can we use pandas agg udfs as window function?

pandas agg udfs as window function is supported. With both unbounded and bounded window.

>> Because the proposed GROUPED_XFORM udf calculates output values for all rows in the group, looks like the proposed GROUPED_XFORM udf can only use window frame (UnboundedPreceding, UnboundedFollowing)

This is correct. It is really using unbounded window as groups here (because there is no groupby transform API in Spark sql).


was (Author: icexelloss):
Hi [~viirya] good questions:

>> Can we use pandas agg udfs as window function?

pandas agg udfs as window function is supported. With both unbounded and bounded window.

>> Because the proposed GROUPED_XFORM udf calculates output values for all rows in the group, looks like the proposed GROUPED_XFORM udf can only use window frame (UnboundedPreceding, UnboundedFollowing)

This is correct. It is really using unbounded window as groups here (because there is no groupby transform API in Spark sql).

> User-defined grouped transform pandas_udf for window operations
> ---------------------------------------------------------------
>
>                 Key: SPARK-28006
>                 URL: https://issues.apache.org/jira/browse/SPARK-28006
>             Project: Spark
>          Issue Type: New Feature
>          Components: PySpark
>    Affects Versions: 2.4.3
>            Reporter: Li Jin
>            Priority: Major
>
> Currently, pandas_udf supports "grouped aggregate" type that can be used with unbounded and unbounded windows. There is another set of use cases that can benefit from a "grouped transform" type pandas_udf.
> Grouped transform is defined as a N -> N mapping over a group. For example, "compute zscore for values in the group using the grouped mean and grouped stdev", or "rank the values in the group".
> Currently, in order to do this, user needs to use "grouped apply", for example:
> {code:java}
> @pandas_udf(schema, GROUPED_MAP)
> def subtract_mean(pdf)
>     v = pdf['v']
>     pdf['v'] = v - v.mean()
>     return pdf
> df.groupby('id').apply(subtract_mean)
> # +---+----+
> # | id|   v|
> # +---+----+
> # |  1|-0.5|
> # |  1| 0.5|
> # |  2|-3.0|
> # |  2|-1.0|
> # |  2| 4.0|
> # +---+----+{code}
> This approach has a few downside:
>  * Specifying the full return schema is complicated for the user although the function only changes one column.
>  * The column name 'v' inside as part of the udf, makes the udf less reusable.
>  * The entire dataframe is serialized to pass to Python although only one column is needed.
> Here we propose a new type of pandas_udf to work with these types of use cases:
> {code:java}
> df = spark.createDataFrame(
>     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
>     ("id", "v"))
> @pandas_udf('double', GROUPED_XFORM)
> def subtract_mean(v):
>     return v - v.mean()
> w = Window.partitionBy('id')
> df = df.withColumn('v', subtract_mean(df['v']).over(w))
> # +---+----+
> # | id|   v|
> # +---+----+
> # |  1|-0.5|
> # |  1| 0.5|
> # |  2|-3.0|
> # |  2|-1.0|
> # |  2| 4.0|
> # +---+----+{code}
> Which addresses the above downsides.
>  * The user only needs to specify the output type of a single column.
>  * The column being zscored is decoupled from the udf implementation
>  * We only need to send one column to Python worker and concat the result with the original dataframe (this is what grouped aggregate is doing already)
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org