You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Weichen Xu (JIRA)" <ji...@apache.org> on 2019/05/09 13:26:00 UTC

[jira] [Commented] (SPARK-26412) Allow Pandas UDF to take an iterator of pd.DataFrames or Arrow batches

    [ https://issues.apache.org/jira/browse/SPARK-26412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836377#comment-16836377 ] 

Weichen Xu commented on SPARK-26412:
------------------------------------

[~mengxr]

 

There's one issue:
 
There're 2 proposals in the JIRA [SPARK-26412]

Proposal (1) 
   Simply provide users the iterator of batches in pd.DataFrame or Arrow table and let user code handle it. Like:
{code:java}
@pandas_udf(StringType(), PandasUDFType.SCALAR_ITERATOR)
def udf1(batches):
  // do some initialization
  for batch in batches:
    # run some code on each row data and get computation result
    yield result{code}
 
 
Proposal (2)
    For scalar pandas UDF, support "start()" and "finish()" besides "apply".
 
But the scalar pandas UDF need to support computing multiple UDF for one row at the same time.
see [https://github.com/apache/spark/blob/master/python/pyspark/worker.py#L287] 
Such as:
df.select(f0(df.col0), f1(df.col1, df.col2), f2(df.col3))
 
So the proposal (1) cannot work, because the "start" and "apply" and "finish" logic are mixed in one function and it cannot be split. But what we need is:
1) run "start" method for each udf f0, f1, f2 (if required)
2) for each row coming from iterator, run f0, f1, f2 apply method on the row.
3) run "finish" method for each udf f0, f1, f2 (if required)

So only the proposal (2) works.

> Allow Pandas UDF to take an iterator of pd.DataFrames or Arrow batches
> ----------------------------------------------------------------------
>
>                 Key: SPARK-26412
>                 URL: https://issues.apache.org/jira/browse/SPARK-26412
>             Project: Spark
>          Issue Type: New Feature
>          Components: PySpark
>    Affects Versions: 3.0.0
>            Reporter: Xiangrui Meng
>            Assignee: Weichen Xu
>            Priority: Major
>
> Pandas UDF is the ideal connection between PySpark and DL model inference workload. However, user needs to load the model file first to make predictions. It is common to see models of size ~100MB or bigger. If the Pandas UDF execution is limited to batch scope, user need to repeatedly load the same model for every batch in the same python worker process, which is inefficient. I created this JIRA to discuss possible solutions.
> Essentially we need to support "start()" and "finish()" besides "apply". We can either provide those interfaces or simply provide users the iterator of batches in pd.DataFrame or Arrow table and let user code handle it.
> Another benefit is with iterator interface and asyncio from Python, it is flexible for users to implement data pipelining.
> cc: [~icexelloss] [~bryanc] [~holdenk] [~hyukjin.kwon] [~ueshin] [~smilegator]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org