You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jelmer Kuperus (Jira)" <ji...@apache.org> on 2023/10/26 08:33:00 UTC

[jira] [Created] (SPARK-45675) Specify number of partitions when creating spark dataframe from pandas dataframe

Jelmer Kuperus created SPARK-45675:
--------------------------------------

             Summary: Specify number of partitions when creating spark dataframe from pandas dataframe
                 Key: SPARK-45675
                 URL: https://issues.apache.org/jira/browse/SPARK-45675
             Project: Spark
          Issue Type: Improvement
          Components: Pandas API on Spark
    Affects Versions: 3.5.0
            Reporter: Jelmer Kuperus


When converting a large pandas dataframe to a spark dataframe like so


 
{code:java}
import pandas as pd pdf = pd.DataFrame([{"board_id": "3074457346698037360_0", "file_name": "board-content", "value": "A" * 119251} for i in range(0, 20000)]) spark.createDataFrame(pdf).write.mode("overwrite").format("delta").saveAsTable("catalog.schema.table"){code}
 
 

You can encounter the following error

org.apache.spark.SparkException: Job aborted due to stage failure: Serialized task 11:1 was 366405365 bytes, which exceeds max allowed: spark.rpc.message.maxSize (268435456 bytes). Consider increasing spark.rpc.message.maxSize or using broadcast variables for large values.

 

As far as I can tell spark first converts the pandas dataframe into a python list and then constructs an rdd out of that list. which means that the parallelism is determined by the value of spark.sparkcontext.defaultparallelism and if the pandas dataframe is very large and the number of available cores is low then you end up with very large tasks that exceed the limits imposed on the size of tasks

 

Methods like spark.sparkContext.parallelize allow you to pass in the number of partitions of the resulting dataset. I think having a similar capability when creating a dataframe from a pandas dataframe makes a lot of sense. As right now I think the only workaround I can think of is changing the value of spark.default.parallelism but this is a system wide setting



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org