You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2020/01/30 01:08:00 UTC

[jira] [Resolved] (SPARK-30670) Pipes for PySpark

     [ https://issues.apache.org/jira/browse/SPARK-30670?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-30670.
----------------------------------
    Resolution: Duplicate

> Pipes for PySpark
> -----------------
>
>                 Key: SPARK-30670
>                 URL: https://issues.apache.org/jira/browse/SPARK-30670
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 2.4.4
>            Reporter: Vincent
>            Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> I would propose to add a `pipe` method to a Spark Dataframe. It allows for a functional programming pattern that is inspired from the tidyverse that is currently missing. The pandas community also recently adopted this pattern, documented [here]([https://tomaugspurger.github.io/method-chaining.html).]
> This is the idea. Suppose you had this;
> {code:java}
> # file that has [user, date, timestamp, eventtype]
> ddf = spark.read.parquet("<filepath>")
> w_user = Window().partitionBy("user")
> w_user_date = Window().partitionBy("user", "date")
> w_user_time = Window().partitionBy("user").sortBy("timestamp")
> thres_sesstime = 60 * 15 
> min_n_rows = 10
> min_n_sessions = 5
> clean_ddf = (ddf
>   .withColumn("delta", sf.col("timestamp") - sf.lag("timestamp").over(w_user))
>   .withColumn("new_session", (sf.col("delta") > thres_sesstime).cast("integer"))
>   .withColumn("session", sf.sum(sf.col("new_session")).over(w_user))
>   .drop("new_session")
>   .drop("delta")
>   .withColumn("nrow_user", sf.count(sf.col("timestamp")))
>   .withColumn("nrow_user_date", sf.approx_count_distinct(sf.col("date")))
>   .filter(sf.col("nrow_user") > min_n_rows)
>   .filter(sf.col("nrow_user_date") > min_n_sessions)
>   .drop("nrow_user")
>   .drop("nrow_user_date"))
> {code}
> The code works and it is somewhat clear. We add a session to the dataframe and then we use this to remove outliers. The issue is that this chain of commands can get quite long so instead it might be better to turn this into functions.
> {code:java}
> def add_session(dataf, session_threshold=60*15):
>     w_user = Window().partitionBy("user")
>   
>     return (dataf  
>     .withColumn("delta", sf.col("timestamp") - sf.lag("timestamp").over(w_user))
>     .withColumn("new_session", (sf.col("delta") > threshold_sesstime).cast("integer"))
>     .withColumn("session", sf.sum(sf.col("new_session")).over(w_user))
>     .drop("new_session")
>     .drop("delta"))
> def remove_outliers(dataf, min_n_rows=10, min_n_sessions=5):
>     w_user_date = Window().partitionBy("user", "date")
>     w_user_time = Window().partitionBy("user").sortBy("timestamp")    
>     
>     return (dataf  
>     .withColumn("nrow_user", sf.count(sf.col("timestamp")))
>     .withColumn("nrow_user_date", sf.approx_count_distinct(sf.col("date")))
>     .filter(sf.col("nrow_user") > min_n_rows)
>     .filter(sf.col("nrow_user_date") > min_n_sessions)
>     .drop("nrow_user")
>     .drop("nrow_user_date"))
> {code}
> The issue lies not in these functions. These functions are great! You can unit test them and they really give nice verbs that function as an abstraction. The issue is in how you now need to apply them. 
> {code:java}
> remove_outliers(add_session(ddf, session_threshold=1000), min_n_rows=11)
> {code}
> It'd be much nicer to perhaps allow for this;
> {code:java}
> (ddf
>   .pipe(add_session, session_threshold=900)
>   .pipe(remove_outliers, min_n_rows=11))
> {code}
> The cool thing about this is that you can really easily allow for method chaining but also that you have an amazing way to split high level code and low level code. You still allow mutation as a high level by exposing keyword arguments but you can easily find the lower level code in debugging because you've contained details to their functions.
> For code maintenance, I've relied on this pattern a lot personally. But sofar, I've always monkey-patched spark to be able to do this.
> {code:java}
> from pyspark.sql import DataFrame 
> def pipe(self, func, *args, **kwargs):
>     return func(self, *args, **kwargs)
> {code}
> Could I perhaps add these few lines of code to the codebase?
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org