You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "igorghi (via GitHub)" <gi...@apache.org> on 2023/09/06 14:42:01 UTC

[GitHub] [spark] igorghi commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup

igorghi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1708508134

   @HyukjinKwon This is not matching what I am seeing with is test case. The batch size setting is definitely changing what is available within the `mapInArrow()` call after the repartition by the grouping column.
   
   ```python
   data = []
   for c in range(1,20001):
       for aid in range(1,11):
           data.append([c, aid])
   
   df = spark.createDataFrame(data, schema=["customer_id", "idx"])
   
   def udf(b):
       for i in b:
           print(f"Num of rows per partition: {i.num_rows}")
           yield i
   print("Setting batch size to 10K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10000)
   df.repartition("idx").mapInArrow(udf, schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   print("Setting batch size to 20K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)
   df.repartition("idx").mapInArrow(udf, schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   print("Setting batch size to 30K")
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 30000)
   df.repartition("idx").mapInArrow(udf, schema=df.schema).write.format("noop").mode("overwrite").save()
   print("\n")
   
   def udf2(df):
       print(f"Num of rows per group: {len(df)}")
       return df
   
   print("\n")
   print("Using applyInPandas")
   df.groupby("idx").applyInPandas(udf2, schema=df.schema).write.format("noop").mode("overwrite").save()
   ```
   
   The output:
   ```
   Setting batch size to 10K
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   Num of rows per partition: 10000
   
   
   Setting batch size to 20K
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   Num of rows per partition: 20000
   
   
   Setting batch size to 30K
   Num of rows per partition: 30000
   Num of rows per partition: 30000
   Num of rows per partition: 30000
   Num of rows per partition: 30000
   Num of rows per partition: 30000
   Num of rows per partition: 30000
   Num of rows per partition: 20000
   
   
   
   
   Using applyInPandas
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   Num of rows per group: 20000
   spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 30000)
   
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org