You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongjoon Hyun (Jira)" <ji...@apache.org> on 2021/09/02 16:20:00 UTC

[jira] [Commented] (SPARK-36443) Demote BroadcastJoin causes performance regression and increases OOM risks

    [ https://issues.apache.org/jira/browse/SPARK-36443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17408958#comment-17408958 ] 

Dongjoon Hyun commented on SPARK-36443:
---------------------------------------

Thank you for the details.

> Demote BroadcastJoin causes performance regression and increases OOM risks
> --------------------------------------------------------------------------
>
>                 Key: SPARK-36443
>                 URL: https://issues.apache.org/jira/browse/SPARK-36443
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 3.1.2
>            Reporter: Kent Yao
>            Priority: Major
>         Attachments: image-2021-08-06-11-24-34-122.png, image-2021-08-06-17-57-15-765.png, screenshot-1.png
>
>
>  
> h2. A test case
> Use bin/spark-sql with local mode and all other default settings with 3.1.2 to run the case below
> {code:sql}
> // Some comments here
> set spark.sql.shuffle.partitions=20;
> set spark.sql.adaptive.enabled=true;
> -- set spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin=0; -- (default 0.2)enable this for not demote bhj
> set spark.sql.autoBroadcastJoinThreshold=200;
> SELECT
>   l.id % 12345 k,
>   sum(l.id) sum,
>   count(l.id) cnt,
>   avg(l.id) avg,
>   min(l.id) min,
>   max(l.id) max
> from (select id % 3 id from range(0, 1e8, 1, 100)) l
>   left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id
> GROUP BY 1;
> {code}
>  
>  1. demote bhj w/ nonEmptyPartitionRatioForBroadcastJoin comment out
>  
> | |
> ||[Job Id ▾|http://localhost:4040/jobs/?&completedJob.sort=Job+Id&completedJob.desc=false&completedJob.pageSize=100#completed]||[Description|http://localhost:4040/jobs/?&completedJob.sort=Description&completedJob.pageSize=100#completed]||[Submitted|http://localhost:4040/jobs/?&completedJob.sort=Submitted&completedJob.pageSize=100#completed]||[Duration|http://localhost:4040/jobs/?&completedJob.sort=Duration&completedJob.pageSize=100#completed]||Stages: Succeeded/Total||Tasks (for all stages): Succeeded/Total||
> |4|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1[main at NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=4]|2021/08/06 17:31:37|71 ms|1/1 (4 skipped)|3/3 (205 skipped) 
>   |
> |3|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1[main at NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=3]|2021/08/06 17:31:18|19 s|1/1 (3 skipped)|4/4 (201 skipped) 
>   |
> |2|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1[main at NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=2]|2021/08/06 17:31:18|87 ms|1/1 (1 skipped)|1/1 (100 skipped) 
>   |
> |1|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1[main at NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=1]|2021/08/06 17:31:16|2 s|1/1|100/100 
>   |
> |0|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1[main at NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=0]|2021/08/06 17:31:15|2 s|1/1|100/100 |
> 2. set nonEmptyPartitionRatioForBroadcastJoin to 0 to tell spark not to demote bhj
>  
> ||[Job Id (Job Group) ▾|http://localhost:4040/jobs/?&completedJob.sort=Job+Id+%28Job+Group%29&completedJob.desc=false&completedJob.pageSize=100#completed]||[Description|http://localhost:4040/jobs/?&completedJob.sort=Description&completedJob.pageSize=100#completed]||[Submitted|http://localhost:4040/jobs/?&completedJob.sort=Submitted&completedJob.pageSize=100#completed]||[Duration|http://localhost:4040/jobs/?&completedJob.sort=Duration&completedJob.pageSize=100#completed]||Stages: Succeeded/Total||Tasks (for all stages): Succeeded/Total||
> |5|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1[main at NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=5]|2021/08/06 18:25:15|29 ms|1/1 (2 skipped)|3/3 (200 skipped) 
>   |
> |4|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1[main at NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=4]|2021/08/06 18:25:13|2 s|1/1 (1 skipped)|100/100 (100 skipped) 
>   |
> |3 (700fefe1-8446-4761-9be2-b68ed6e84c11)|broadcast exchange (runId 700fefe1-8446-4761-9be2-b68ed6e84c11)[$anonfun$withThreadLocalCaptured$1 at FutureTask.java:266|http://localhost:4040/jobs/job/?id=3]|2021/08/06 18:25:13|54 ms|1/1 (2 skipped)|1/1 (101 skipped) 
>   |
> |2|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1[main at NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=2]|2021/08/06 18:25:13|88 ms|1/1 (1 skipped)|1/1 (100 skipped) 
>   |
> |1|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1[main at NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=1]|2021/08/06 18:25:10|2 s|1/1|100/100 
>   |
> |0|SELECT l.id % 12345 k, sum(l.id) sum, count(l.id) cnt, avg(l.id) avg, min(l.id) min, max(l.id) max from (select id % 3 id from range(0, 1e8, 1, 100)) l left join (SELECT max(id) as id, id % 2 gid FROM range(0, 1000, 2, 100) group by gid) r ON l.id = r.id GROUP BY 1[main at NativeMethodAccessorImpl.java:0|http://localhost:4040/jobs/job/?id=0]|2021/08/06 18:25:10|3 s|1/1|100/100
>   |
> The clause `select id % 3 id from range(0, 1e8, 1, 100)) l ` here produces highly compressed shuffle map output and 17/20 empty partitions at the reduced side, where is also the AQE reOptimize point for DynamicJoinSelection.
> {code:java}
> Exchange
> shuffle records written: 100,000,000
> shuffle write time total (min, med, max )
> 891 ms (2 ms, 5 ms, 33 ms )
> records read: 100,000,000
> local bytes read total (min, med, max )
> 10.0 MiB (3.3 MiB, 3.4 MiB, 3.4 MiB )
> fetch wait time total (min, med, max )
> 0 ms (0 ms, 0 ms, 0 ms )
> remote bytes read: 0.0 B
> local blocks read: 300
> remote blocks read: 0
> data size total (min, med, max )
> 1525.9 MiB (15.3 MiB, 15.3 MiB, 15.3 MiB )
> remote bytes read to disk: 0.0 B
> shuffle bytes written total (min, med, max )
> 10.0 MiB (102.3 KiB, 102.3 KiB, 102.3 KiB )
> {code}
>  
> In the case 1), the bhj is demoted and the `coalesce partitions rule` successfully coalesces these 'small' partitions even set *spark.sql.adaptive.advisoryPartitionSizeInBytes=1m*. See,
>  
> !screenshot-1.png!
> Then, as you can see at the smj phase, the former coalesce and the latter expansion cause performance regression
>  
> {code:java}
> // code placeholder
> Sort
> sort time total (min, med, max (stageId: taskId))
> 166 ms (0 ms, 55 ms, 57 ms (stage 7.0: task 203))
> peak memory total (min, med, max (stageId: taskId))
> 315.1 MiB (64.0 KiB, 105.0 MiB, 105.0 MiB (stage 7.0: task 201))
> spill size total (min, med, max (stageId: taskId))
> 1845.0 MiB (0.0 B, 615.0 MiB, 615.0 MiB (stage 7.0: task 201)
> {code}
>  
>  
> |1|202|0|SUCCESS| |driver| | |2021-08-06 17:31:18|18 s|4 s|3.0 ms|10.0 ms| | |105.3 MiB|1.0 ms|91 B / 1|3.4 MiB / 33333333|615 MiB|4.5 MiB| |
> |2|203|0|SUCCESS| |driver| | |2021-08-06 17:31:18|19 s|4 s|4.0 ms|10.0 ms| | |105.3 MiB|1.0 ms|89 B / 1|3.4 MiB / 33333333|615 MiB|4.5 MiB| |
> |0|201|0|SUCCESS| |driver| | |2021-08-06 17:31:18|17 s|4 s|6.0 ms|10.0 ms| | |105.3 MiB|1.0 ms|70 B / 1|3.3 MiB / 33333334|615 MiB|4.4 MiB|
>  
> In the case 2), the bhj mode increases task numbers which will casue extra schedule overhead and running unnecessary empty tasks, but it avoid the oom risk and the performance regression described  above.
> h2. A real-world case, in which the expansion of the data increases the oom risk to a very high level. 
>  
>  
> !image-2021-08-06-11-24-34-122.png!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org