You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Niranda Perera (Jira)" <ji...@apache.org> on 2022/08/30 19:04:00 UTC

[jira] [Commented] (SPARK-40233) Unable to load large pandas dataframe to pyspark

    [ https://issues.apache.org/jira/browse/SPARK-40233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17598021#comment-17598021 ] 

Niranda Perera commented on SPARK-40233:
----------------------------------------

I believe the issue is related to executors not being able to load data from the python driver (possibly not having enough memory). I believe first one executor would have to load the entire data dump from the python driver and then repartition it.

 

My suggestion is to add a `num_partitions` option for createDataFrame method so that partitioning can be handled at the driver and sent to the executors as a list of RDDs. Is this an acceptable way from the POV of spark internals?

> Unable to load large pandas dataframe to pyspark
> ------------------------------------------------
>
>                 Key: SPARK-40233
>                 URL: https://issues.apache.org/jira/browse/SPARK-40233
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.3.0
>            Reporter: Niranda Perera
>            Priority: Major
>
> I've been trying to join two large pandas dataframes using pyspark using the following code. I'm trying to vary executor cores allocated for the application and measure scalability of pyspark (strong scaling).
> {code:java}
> r = 1000000000 # 1Bn rows 
> it = 10
> w = 256
> unique = 0.9
> TOTAL_MEM = 240
> TOTAL_NODES = 14
> max_val = r * unique
> rng = default_rng()
> frame_data = rng.integers(0, max_val, size=(r, 2)) 
> frame_data1 = rng.integers(0, max_val, size=(r, 2)) 
> print(f"data generated", flush=True)
> df_l = pd.DataFrame(frame_data).add_prefix("col")
> df_r = pd.DataFrame(frame_data1).add_prefix("col")
> print(f"data loaded", flush=True)
> procs = int(math.ceil(w / TOTAL_NODES))
> mem = int(TOTAL_MEM*0.9)
> print(f"world sz {w} procs per worker {procs} mem {mem} iter {it}", flush=True)
> spark = SparkSession\
>     .builder\
>     .appName(f'join {r} {w}')\
>     .master('spark://node:7077')\
>     .config('spark.executor.memory', f'{int(mem*0.6)}g')\
>     .config('spark.executor.pyspark.memory', f'{int(mem*0.4)}g')\
>     .config('spark.cores.max', w)\
>     .config('spark.driver.memory', '100g')\
>     .config('sspark.sql.execution.arrow.pyspark.enabled', 'true')\
>     .getOrCreate()
> sdf0 = spark.createDataFrame(df_l).repartition(w).cache()
> sdf1 = spark.createDataFrame(df_r).repartition(w).cache()
> print(f"data loaded to spark", flush=True)
> try:           
>     for i in range(it):
>         t1 = time.time()
>         out = sdf0.join(sdf1, on='col0', how='inner')
>         count = out.count()
>         t2 = time.time()
>         print(f"timings {r} 1 {i} {(t2 - t1) * 1000:.0f} ms, {count}", flush=True)
>         
>         del out
>         del count
>         gc.collect()
> finally:
>     spark.stop() {code}
> {*}Cluster{*}: I am using standalone spark cluster in a 15 node cluster with 48 cores and 240GB RAM each. I've spawned master and the driver code in node1, while other 14 nodes have spawned workers allocating maximum memory. In the spark context, I am reserving 90% of total memory to executor, splitting 60% to jvm and 40% to pyspark.
> {*}Issue{*}: When I run the above program, I can see that the executors are being assigned to the app. But it doesn't move forward, even after 60 mins. For smaller row count (10M), this was working without a problem. Driver output
> {code:java}
> world sz 256 procs per worker 19 mem 216 iter 8
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
> 22/08/26 14:52:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
> /N/u/d/dnperera/.conda/envs/env/lib/python3.8/site-packages/pyspark/sql/pandas/conversion.py:425: UserWarning: createDataFrame attempted Arrow optimization because 'spark.sql.execution.arrow.pyspark.enabled' is set to true; however, failed by the reason below:
>   Negative initial size: -589934400
> Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
>   warn(msg) {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org