You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "igorghi (via GitHub)" <gi...@apache.org> on 2023/09/06 14:42:01 UTC
[GitHub] [spark] igorghi commented on pull request #38624: [SPARK-40559][PYTHON] Add applyInArrow to groupBy and cogroup
igorghi commented on PR #38624:
URL: https://github.com/apache/spark/pull/38624#issuecomment-1708508134
@HyukjinKwon This is not matching what I am seeing with is test case. The batch size setting is definitely changing what is available within the `mapInArrow()` call after the repartition by the grouping column.
```python
data = []
for c in range(1,20001):
for aid in range(1,11):
data.append([c, aid])
df = spark.createDataFrame(data, schema=["customer_id", "idx"])
def udf(b):
for i in b:
print(f"Num of rows per partition: {i.num_rows}")
yield i
print("Setting batch size to 10K")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 10000)
df.repartition("idx").mapInArrow(udf, schema=df.schema).write.format("noop").mode("overwrite").save()
print("\n")
print("Setting batch size to 20K")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 20000)
df.repartition("idx").mapInArrow(udf, schema=df.schema).write.format("noop").mode("overwrite").save()
print("\n")
print("Setting batch size to 30K")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 30000)
df.repartition("idx").mapInArrow(udf, schema=df.schema).write.format("noop").mode("overwrite").save()
print("\n")
def udf2(df):
print(f"Num of rows per group: {len(df)}")
return df
print("\n")
print("Using applyInPandas")
df.groupby("idx").applyInPandas(udf2, schema=df.schema).write.format("noop").mode("overwrite").save()
```
The output:
```
Setting batch size to 10K
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Num of rows per partition: 10000
Setting batch size to 20K
Num of rows per partition: 20000
Num of rows per partition: 20000
Num of rows per partition: 20000
Num of rows per partition: 20000
Num of rows per partition: 20000
Num of rows per partition: 20000
Num of rows per partition: 20000
Num of rows per partition: 20000
Num of rows per partition: 20000
Num of rows per partition: 20000
Setting batch size to 30K
Num of rows per partition: 30000
Num of rows per partition: 30000
Num of rows per partition: 30000
Num of rows per partition: 30000
Num of rows per partition: 30000
Num of rows per partition: 30000
Num of rows per partition: 20000
Using applyInPandas
Num of rows per group: 20000
Num of rows per group: 20000
Num of rows per group: 20000
Num of rows per group: 20000
Num of rows per group: 20000
Num of rows per group: 20000
Num of rows per group: 20000
Num of rows per group: 20000
Num of rows per group: 20000
Num of rows per group: 20000
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 30000)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org