You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/07/18 16:39:41 UTC

[GitHub] [spark] d80tb7 commented on issue #24981: [WIP][SPARK-27463][PYTHON] Support Dataframe Cogroup via Pandas UDFs- Arrow Stream Impl

d80tb7 commented on issue #24981: [WIP][SPARK-27463][PYTHON] Support Dataframe Cogroup via Pandas UDFs- Arrow Stream Impl
URL: https://github.com/apache/spark/pull/24981#issuecomment-512893972
 
 
   Hi @icexelloss, @hjoo 
   
   to answer your questions about the API- yes the` df1.cogroup(df2, on='id').apply(func)` is more succinct and yes we could easily add a simple wrapper to make both forms available.  Personally I'd prefer not to add a wrapper because I belive that multiple apis to acomplish the same thing is a bit confusing, but if people want to adopt it as a comporomise I think I could be persuaded.
   
   As for the two api calls- we've been back and forth on this a few times.  Just to clear up one point made earlier, the `df1.groupby('id').cogroup(df2.groupby('id')).apply(func)` form *does* allow you to group by different key columns (or expressions).  The ` df1.cogroup(df2, on='id').apply(func)` could conceivably allow you to do this but we would have to take some sort of expression with aliasing (like join) for this to be done.
   
   I'd really like to agree the final api before doing a final PR though, mainly because I don't think we should commit anything that consistitues a public api without it being agreed.  If nothing else I'd like to hear what @HyukjinKwon thinks as he expressed an interest in the API on the JIRA and probably has a good feel for how the pyspark APIs should look.
   
   Finally I've been on holiday this week so not much time for coding but I did run some benchmarks between the Arrow Stream and non-Arrow Stream versions of this code.  The driver program was as follows:
   
   ```
   from pyspark.sql import SparkSession
   from pyspark.sql.functions import col, pandas_udf, PandasUDFType
   
   spark = SparkSession\
       .builder\
       .master('local[1]')\
       .config('spark.sql.shuffle.partitions', 1)\
       .getOrCreate()
   
   @pandas_udf('id long, t long, v double', PandasUDFType.COGROUPED_MAP)
   def only_left(l, _):
       return l
   
   group_sizes = [1, 10, 100, 1000, 10000, 100000]
   
   for group_size in group_sizes:
       df = spark.range(0, 100000) \
           .withColumn('t', col('id')) \
           .withColumn('v', col('id').cast('double')) \
           .withColumn('id', (col('id')/ group_size).cast('long'))
       print('Group Size: ' + str(group_size))
       df\
           .groupby('id')\
           .cogroup(df.groupby('id'))\
           .apply(only_left)\
           .write.mode('overwrite').parquet('output')
   ```
   In short, this will cogroup a 100k  row dataframe with itself using a single partition.  Results are as follows (All times in ms as reported by total section of ReaderIterator.HandleTimingData):
   
   | Group Size  | Time Arrow Stream | Time Non-Arrow Stream | 
   | -------------: | -------------: | -------------: |
   | 1| 261,718 |258,950|
   | 10|  25,491 |25208|
   | 100| 2,580  | 2,521 |
   | 1,000| 291  | 298 |
   | 10,000| 59 | 74 |
   | 100,000| 13 | 49 |
   
   I think this demonstrates that the the overhead of sending the schema is pretty minimal (although at some point I might look at where the overhead does come from as it's slower than I would expect and simple groupby.apply() appears to exhibit the same), so unless anyone says otherwise I'll assu,e that keeping with Arrow streams is the way we will go on this.
   
   Finally (and apologies for the length of this post) I'll continue with my plan of getting this into a mergable state. 
   
   Chris
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org