You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Lior Chaga (Jira)" <ji...@apache.org> on 2020/01/01 08:57:00 UTC

[jira] [Commented] (SPARK-24906) Adaptively set split size for columnar file to ensure the task read data size fit expectation

    [ https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17006345#comment-17006345 ] 

Lior Chaga commented on SPARK-24906:
------------------------------------

[~habren] 
The suggested approach of using a multiplier will be good for rather simple schemas. 
It will not work however for very complex schemas which include repeated fields (as the amount of repetitions cannot be predicted). 

I suggest an alternative approach that would provide better estimation for complex schemas, but would require format specific implementation. 
For parquet, for instance, we could sample the metadata of several row groups (sample ratio may be configurable), and use the column size vs total size of rowgroup to get a rather accurate estimation of the data to be read. 
As much as the variance in metadata is lower between different row groups of the data, a smaller sample ratio may be used. 

> Adaptively set split size for columnar file to ensure the task read data size fit expectation
> ---------------------------------------------------------------------------------------------
>
>                 Key: SPARK-24906
>                 URL: https://issues.apache.org/jira/browse/SPARK-24906
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.1
>            Reporter: Jason Guo
>            Priority: Major
>         Attachments: image-2018-07-24-20-26-32-441.png, image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, image-2018-07-24-20-30-24-552.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 128 MB by default since spark.sql.files.maxPartitionBytes is default to 128MB. Even when user set it to a large value, such as 512MB, the task may read only few MB or even hundreds of KB. Because the table (Parquet) may consists of dozens of columns while the SQL only need few columns. And spark will prune the unnecessary columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. When use query on an integer type column and an long type column, the maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will run faster. More importantly, for a very large cluster (more the 10 thousand nodes), it will relieve RM's schedule pressure.
>  
> Here is the test
>  
> The table named test2 has more than 40 columns and there are more than 5 TB data each hour.
> When we issue a very simple query 
>  
> {code:java}
> select count(device_id) from test2 where date=20180708 and hour='23'{code}
>  
> There are 72176 tasks and the duration of the job is 4.8 minutes
> !image-2018-07-24-20-26-32-441.png!
>  
> Most tasks last less than 1 second and read less than 1.5 MB data
> !image-2018-07-24-20-28-06-269.png!
>  
> After the optimization, there are only 1615 tasks and the job last only 30 seconds. It almost 10 times faster.
> !image-2018-07-24-20-29-24-797.png!
>  
> The median of read data is 44.2MB. 
> !image-2018-07-24-20-30-24-552.png!
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org