You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jason Guo (JIRA)" <ji...@apache.org> on 2018/08/07 00:32:00 UTC

[jira] [Updated] (SPARK-24906) Adaptively set split size for columnar file to ensure the task read data size fit expectation

     [ https://issues.apache.org/jira/browse/SPARK-24906?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jason Guo updated SPARK-24906:
------------------------------
    Summary: Adaptively set split size for columnar file to ensure the task read data size fit expectation  (was: Enlarge split size for columnar file to ensure the task read enough data)

> Adaptively set split size for columnar file to ensure the task read data size fit expectation
> ---------------------------------------------------------------------------------------------
>
>                 Key: SPARK-24906
>                 URL: https://issues.apache.org/jira/browse/SPARK-24906
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.3.1
>            Reporter: Jason Guo
>            Priority: Critical
>         Attachments: image-2018-07-24-20-26-32-441.png, image-2018-07-24-20-28-06-269.png, image-2018-07-24-20-29-24-797.png, image-2018-07-24-20-30-24-552.png
>
>
> For columnar file, such as, when spark sql read the table, each split will be 128 MB by default since spark.sql.files.maxPartitionBytes is default to 128MB. Even when user set it to a large value, such as 512MB, the task may read only few MB or even hundreds of KB. Because the table (Parquet) may consists of dozens of columns while the SQL only need few columns. And spark will prune the unnecessary columns.
>  
> In this case, spark DataSourceScanExec can enlarge maxPartitionBytes adaptively. 
> For example, there is 40 columns , 20 are integer while another 20 are long. When use query on an integer type column and an long type column, the maxPartitionBytes should be 20 times larger. (20*4+20*8) /  (4+8) = 20. 
>  
> With this optimization, the number of task will be smaller and the job will run faster. More importantly, for a very large cluster (more the 10 thousand nodes), it will relieve RM's schedule pressure.
>  
> Here is the test
>  
> The table named test2 has more than 40 columns and there are more than 5 TB data each hour.
> When we issue a very simple query 
>  
> {code:java}
> select count(device_id) from test2 where date=20180708 and hour='23'{code}
>  
> There are 72176 tasks and the duration of the job is 4.8 minutes
> !image-2018-07-24-20-26-32-441.png!
>  
> Most tasks last less than 1 second and read less than 1.5 MB data
> !image-2018-07-24-20-28-06-269.png!
>  
> After the optimization, there are only 1615 tasks and the job last only 30 seconds. It almost 10 times faster.
> !image-2018-07-24-20-29-24-797.png!
>  
> The median of read data is 44.2MB. 
> !image-2018-07-24-20-30-24-552.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org