You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Chris Martin (JIRA)" <ji...@apache.org> on 2019/04/15 10:40:00 UTC

[jira] [Created] (SPARK-27463) SPIP: Support Dataframe Cogroup via Pandas UDFs

Chris Martin created SPARK-27463:
------------------------------------

             Summary: SPIP: Support Dataframe Cogroup via Pandas UDFs 
                 Key: SPARK-27463
                 URL: https://issues.apache.org/jira/browse/SPARK-27463
             Project: Spark
          Issue Type: Improvement
          Components: PySpark, SQL
    Affects Versions: 3.0.0
            Reporter: Chris Martin


h2. *Background and Motivation*

Recently there has been a great deal of work in PySpark to improve interoperability with the Pandas library. This work has allowed users to write User Defined Functions (UDFs) in Pandas which can then be applied to a Spark DataFrame. The benefit here is that it allows users to combine the functionality of Pandas with the parallelisation abilities of Spark. In addition, these new Pandas UDFs have significantly lower overhead than traditional UDFS as they operate on a batch of data at a time (i.e. they are vectorised) and they use Apache Arrow for serialisation between the JVM and Python processes.

As of Spark 2.3 two types of Pandas UDF are offered. Scalar UDFs effectively offer a map operation at the row level, while Grouped Map UDFs allow a map operation on a group of data. This functionality has proved successful in allowing users to integrate Spark with existing Pandas workflows, however there are situations where the existing functionality offered is not sufficient. One such case is analogous to the existing Cogroup functionality available on RDDs and DataSets and was proposed by Li Jin on the Spark-Dev mailing list[1] . In this case, the user would like to group two Spark DataFrames by a common key and then apply a python function to each group. This python function would take two pandas DataFrames as its arguments and would return an arbitrary length Pandas DataFrame.

To give a concrete example of the usefulness of this functionality, consider the use case of performing an as-of join between two distinct DataFrames This is something that has traditionally been very difficult to do in Spark (and indeed in SQL in general)[2] but which has good support in Pandas[3]. If Cogroup-like functionality was available in PySpark then one could simply write a Pandas function to perform the as-of joining which could then be applied to two (appropriately grouped) DataFrames.

This proposal therefore advocates introducing a new API call which would allow for a Cogrouped Pandas UDF.

[1][http://mail-archives.apache.org/mod_mbox/spark-dev/201902.mbox/%3cCAGY9duXt569bpGp0WSC2eSJgcoo5+HbFiHFBkOFCOccLmJh_ZA@mail.gmail.com%3e]

[2]see https://issues.apache.org/jira/browse/SPARK-22947 for a SPIP that aims to add asof join functionality to Spark.

[3][https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.merge_asof.html]
h2. *API Changes*

The public API changes would all be on the PySpark side. In terms of the API itself there are a couple of options depending on whether the goal is syntactic brevity or with consistency with the DataSet version of cogroup. If brevity is the aim then a new method can be added to the DataFrame class:

 
{code:java}
# other is another DataFrame, on is the cogroup key, udf is the function to apply.
def cogroup(self, other, on, udf){code}
 

Alternatively, to be consistent with the DataSet version of cogroup, a new method could be added to the GroupedData class.

 
{code:java}
# other is another GroupedData, udf is the function to apply.
def cogroup(self, other, udf){code}
 

The exact API can be worked out as part of this SPIP and the document will be updated once a decision has been reached.

In addition, a new PandasUDFType, COGROUPED_MAP, will be defined to identify this new type of UDF. Functions annotated with this decorator should take two Pandas DataFrames and return a single Pandas DataFrame. Here is an example of usage, using the as-of join use case described earlier and the first option for the API syntax.

 
{code:java}
@pandas_udf(return_schema, PandasUDFType.COGROUPED_MAP)
# df1, df2 and function return are all pandas.DataFrames
def asof_join(df1, df2):
  return pd.merge_asof(df1, df2, on='time')

df1.cogroup(df2, on='product_id', apply=asof_join){code}
 
h2. *Target Personas*

Data scientists, data engineers, library developers.
h2. *Scope*
 * Initial implementation will only consider the case of Cogrouping exactly two DataFrames. Further work may extend this to the case of multiple DataFrames
 * API call is to be made available via PySpark only. No equivalent R/Java/Scala functionality will be offered.

h2. *Design*
 * New UDF type, PandasUDFType.COGROUPED_MAP, to be defined in PySpark
 * New public method to be added to either GroupedData or DataFrame to expose cogroup in Pyspark
 * New package private method to be added to RelationGroupedDataset to allow cogroup in Scala
 * New logical node to be added representing cogroup.
 * New physical node to be added to implement cogroup. This node will ensure correct partitioning of input DataFrames and create two groupedIterators which will be piped into the Python process for UDF execution.
 * Extend ArrowPythonRunner such that it can serialise two separate DataFrames for input into the Python process.
 * Extend worker.py such that it can accept the two serialised DataFrames produced by the Scala process and then pass them to the UDF function.

Inherent in the design is that both the Java Executor and the Python Worker must have enough memory available to hold the largest group. Furthermore, no partial aggregation will be possible which implies that a shuffle must be performed to colocate the data. Note that both these properties are shared with the existing groupby().apply() functionality, and the latter with the existing DataSet cogroup.

The aim is to leverage as much of the current Spark functionality as possible. Data locality can be handled by the existing Spark functionality while serialisation between the JVM and Python processes will use the existing Arrow mechanism. The main complexity here will lie in the extension of the serialisation mechanism to allow two separate DataFrames to be passed from the JVM to the python process. At present this looks as if this is tied to the concept of serialising rows from a single Spark DataFrame into a single Arrow Table and then into a Single Pandas DataFrame. In order to extend this, the code on both the JVM and Python sides must be refactored.
h2. *Risks*

The majority of the work required is low risk as it requires addition of new functionality which is relatively isolated from any existing code. The exception, however, is the extension of the serialisation code as this could affect the operation of existing udfs (both Pandas and non-Pandas).
h2. 
*Timescales*

Most of the changes are fairly straightforward, however the complexity around the serialisation refactoring does lead to some uncertainty here. With that in mind I would estimate 4-8 weeks of work for the implementation and another 2-3 weeks for testing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org