You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Michael Tong (Jira)" <ji...@apache.org> on 2023/03/02 04:58:00 UTC

[jira] [Created] (SPARK-42645) Introduce feature to allow for function caching across input rows.

Michael Tong created SPARK-42645:
------------------------------------

             Summary: Introduce feature to allow for function caching across input rows.
                 Key: SPARK-42645
                 URL: https://issues.apache.org/jira/browse/SPARK-42645
             Project: Spark
          Issue Type: Wish
          Components: Optimizer
    Affects Versions: 3.3.2
            Reporter: Michael Tong


Introduce the ability to make functions cachable across input rows. I'm imagining this function to work similarly to python's [functools.cache|https://docs.python.org/3/library/functools.html#functools.cache] where you could add a decorator to certain expensive functions that you know will regularly encounter repeated values as you read the input data.

 

With this new feature you would be able to significantly speed up many real world jobs that use expensive functions on data that naturally has repeated column values. An example of this would be parsing user agent fields from internet traffic logs partitioned by user id. Even though the data is not sorted by user agent, in a sample of 10k continuous rows there would be much less than 10k unique values because popular user agents exist on a large fraction of traffic and the user agent of the first event from a user is likely to be shared among all subsequent events from that user. Currently there is a way to hack an approximation of this in a python implementation of this via pandas_udfs. This works because pandas_udfs by default read in batches of 10k input rows, so you can used a caching UDF that empties every 10k rows. At my current job I have noticed that some applications of this trick can significantly speed up queries where custom UDFs are the bottleneck in a query. An example of this is

 
{code:java}
@F.pandas_udf(T.StringType())
def parse_user_agent_field(user_agent_series):
    @functools.cache
    def parse_user_agent_field_helper(user_agent):
        # parse the user agent and return the relevant field
        return None
    return user_agent_series.apply(parse_user_agent_field_helper){code}
 

 

It would be nice if there was some official support for this behavior for both built in functions and UDFs. If there was official support for this I'd imagine it to look something like

 
{code:java}
# using pyspark dataframe API
df = df.withColumn(output_col, F.cache(F.function)(input_col)){code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org