You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Li Jin (JIRA)" <ji...@apache.org> on 2019/06/11 22:24:00 UTC
[jira] [Commented] (SPARK-28006) User-defined grouped transform
pandas_udf for window operations
[ https://issues.apache.org/jira/browse/SPARK-28006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16861559#comment-16861559 ]
Li Jin commented on SPARK-28006:
--------------------------------
cc [~hyukjin.kwon] [~LI,Xiao] [~ueshin] [~bryanc]
I think code wise this is pretty simple but since this is adding a new pandas udf type I'd like to get some feedback on this.
> User-defined grouped transform pandas_udf for window operations
> ---------------------------------------------------------------
>
> Key: SPARK-28006
> URL: https://issues.apache.org/jira/browse/SPARK-28006
> Project: Spark
> Issue Type: New Feature
> Components: PySpark
> Affects Versions: 2.4.3
> Reporter: Li Jin
> Priority: Major
>
> Currently, pandas_udf supports "grouped aggregate" type that can be used with unbounded and unbounded windows. There is another set of use cases that can benefit from a "grouped transform" type pandas_udf.
> Grouped transform is defined as a N -> N mapping over a group. For example, "compute zscore for values in the group using the grouped mean and grouped stdev", or "rank the values in the group".
>
> Currently, in order to do this, user needs to use "grouped apply", for example:
>
> {code:java}
> @pandas_udf(schema, GROUPED_MAP)
> def zscore(pdf)
> v = pdf['v']
> pdf['v'] = v - v.mean() / v.std()
> return pdf
> df.groupby('id').apply(zscore){code}
> This approach has a few downside:
>
> * Specifying the full return schema is complicated for the user although the function only changes one column.
> * The column name 'v' inside as part of the udf, makes the udf less reusable.
> * The entire dataframe is serialized to pass to Python although only one column is needed.
> Here we propose a new type of pandas_udf to work with these types of use cases:
> {code:java}
> @pandas_udf('double', GROUPED_XFORM)
> def zscore(v):
> return v - v.mean() / v.std()
> w = Window.partitionBy('id')
> df = df.withColumn('v_zscore', zscore(df['v']).over(w)){code}
> Which addresses the above downsides.
> * The user only needs to specify the output type of a single column.
> * The column being zscored is decoupled from the udf implementation
> * We only need to send one column to Python worker and concat the result with the original dataframe (this is what grouped aggregate is doing already)
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org