You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "RJ Marcus (Jira)" <ji...@apache.org> on 2022/08/10 22:02:00 UTC

[jira] [Updated] (SPARK-40038) spark.sql.files.maxPartitionBytes does not observe on-disk compression

     [ https://issues.apache.org/jira/browse/SPARK-40038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

RJ Marcus updated SPARK-40038:
------------------------------
    Attachment: Screenshot from 2022-08-10 16-59-56.png
                Screenshot from 2022-08-10 16-50-37.png

> spark.sql.files.maxPartitionBytes does not observe on-disk compression
> ----------------------------------------------------------------------
>
>                 Key: SPARK-40038
>                 URL: https://issues.apache.org/jira/browse/SPARK-40038
>             Project: Spark
>          Issue Type: Question
>          Components: Input/Output, Optimizer, PySpark, SQL
>    Affects Versions: 3.2.0
>         Environment: files:
> - ORC with snappy compression
> - 232 GB files on disk 
> - 1800 files on disk (pretty sure no individual file is over 200MB)
> - 9 partitions on disk
> cluster:
> - EMR 6.6.0 (spark 3.2.0)
> - cluster: 288 vCPU (executors), 1.1TB memory (executors)
> OS info:
> LSB Version:    :core-4.1-amd64:core-4.1-noarch:cxx-4.1-amd64:cxx-4.1-noarch:desktop-4.1-amd64:desktop-4.1-noarch:languages-4.1-amd64:languages-4.1-noarch:printing-4.1-amd64:printing-4.1-noarch
> Distributor ID:    Amazon
> Description:    Amazon Linux release 2 (Karoo)
> Release:    2
> Codename:    Karoo
>            Reporter: RJ Marcus
>            Priority: Major
>         Attachments: Screenshot from 2022-08-10 16-50-37.png, Screenshot from 2022-08-10 16-59-56.png
>
>
> Why does `spark.sql.files.maxPartitionBytes` estimate the number of partitions based on {_}file size on disk instead of the uncompressed file size{_}?
> For example I have a dataset that is 213GB on disk. When I read this in to my application I get 2050 partitions based on the default value of 128MB for maxPartitionBytes. My application is a simple broadcast index join that adds 1 column to the dataframe and writes it out. There is no shuffle.
> Initially the size of input /output records seem ok, but I still get a large amount of memory "spill" on the executors. I believe this is due to the data being highly compressed and each partition becoming too big when it is deserialized to work on in memory.
> !image-2022-08-10-16-59-05-233.png!
> (If I try to do a repartition immediately after reading I still see the first stage spilling memory to disk, so that is not the right solution or what I'm interested in.) 
> Instead, I attempt to lower maxPartitionBytes by the (average) compression ratio of my files (about 7x, so let's round up to 8). So I set maxPartitionBytes=16MB.  At this point  I see that spark is reading in from the file in 12-28 MB chunks. Now it makes 14316 partitions on the initial file read and completes with no spillage. 
> !image-2022-08-10-16-59-59-778.png!
>  
> Is there something I'm missing here? Is this just intended behavior? How can I tune my partition size correctly for my application when I do not know how much the data will be compressed ahead of time?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org