You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Rui Wang (Jira)" <ji...@apache.org> on 2022/12/14 02:36:00 UTC

[jira] [Commented] (SPARK-41512) Row count based shuffle read to optimize global limit after a single partition shuffle (optionally with input partition sorted)

    [ https://issues.apache.org/jira/browse/SPARK-41512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17646898#comment-17646898 ] 

Rui Wang commented on SPARK-41512:
----------------------------------

cc [~cloud_fan]

> Row count based shuffle read to optimize global limit after a single partition shuffle (optionally with input partition sorted)
> -------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-41512
>                 URL: https://issues.apache.org/jira/browse/SPARK-41512
>             Project: Spark
>          Issue Type: Task
>          Components: SQL
>    Affects Versions: 3.4.0
>            Reporter: Rui Wang
>            Assignee: Rui Wang
>            Priority: Major
>
> h3. Problem Statement
> In current Spark optimizer, a single partition shuffle might be created for a limit if this limit is not the last non-action operation (e.g. a filter following the limit). There is a possibility that the previous output partitions before go into this limit are sorted. The single partition shuffle approach has a correctness bug in this case: shuffle read partitions could be out of partition order and the limit exec just take the first limit rows which could lose the order thus result into wrong result. This is a shuffle so it is relatively costly. Meanwhile, to correct this bug, a native solution is to sort all the data fed into limit again, which is another overhead. 
> h3. Proposed idea
> So we propose a row count based AQE algorithm that optimizes this problem by two folds:
> Avoid the extra sort on the shuffle read side (or with the limit exec) to achieve the correct result.
> Avoid reading all shuffle data from mappers for this single partition shuffle to reduce shuffle cost.
> Note that 1. is only applied for the sorted partition case where 2. is applied for general single partition shuffle + limit case
>  
> The algorithm works as the following: 
> 1. Each mapper will record a row count when writing shuffle data.
> 2. Since this is single shuffle partition case, there is only one partition but N mappers.
> 3. A accumulatorV2 is implemented to collect a list of tuple which records the mapping between mapper id and the number of row written by the mapper (row count metrics)
> 4. AQE framework detects a plan shape of shuffle plus a global limit.
> 5. AQE framework reads necessary data from mappers based on the limit. For example, if mapper 1 writes 200 rows and mapper 2 writes 300 rows, and the limit is 500, AQE creates shuffle read node to write from both mapper 1 and 2, thus skip the left mappers.
> 6. This is both correct for limit with the sorted or non-sorted partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org