You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Mridul Muralidharan (Jira)" <ji...@apache.org> on 2023/01/10 07:49:00 UTC

[jira] [Comment Edited] (SPARK-41953) Shuffle output location refetch during shuffle migration in decommission

    [ https://issues.apache.org/jira/browse/SPARK-41953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17656480#comment-17656480 ] 

Mridul Muralidharan edited comment on SPARK-41953 at 1/10/23 7:48 AM:
----------------------------------------------------------------------

A few things:

* Looking at SPARK-27637, we should revisit it - the {{IsExecutorAlive}} request does not make sense in case of dynamic resource allocation (DRA) when an external shuffle service (ESS) is enabled : we should not be making that call. Thoughts [~Ngone51] ? +CC [~csingh]
This also means, relying on ExecutorDeadException when DRA is enabled with ESS is configured wont be useful.

For rest of the proposal ...

For (2),  I am not sure about 'Make MapOutputTracker support fetch latest output without epoch provided.' - this could have nontrivial interaction with other things, and I will need to think through it. Not sure if we can model node decommission - where we have block moved from host A to host B without any other change - as not requiring an epoch update (or rather, flag the epoch's as 'compatible' - if there are no interleaving updates), requires analysis ....

Assuming we sort out how to get updated state, (3) looks like a reasonable approach.





was (Author: mridulm80):

A few things:

* Looking at SPARK-27637, we should revisit it - the {{IsExecutorAlive}} request does not make sense in case of dynamic resource allocation (DRA) when an external shuffle service (ESS) is enabled : we should not be making that call. Thoughts [~Ngone51] ? +CC [~csingh]
This also means, relying on ExecutorDeadException when DRA is enabled with ESS is configured wont be useful.

For rest of the proposal ...

For (2),  I am not sure about 'Make MapOutputTracker support fetch latest output without epoch provided.' - this could have nontrivial interaction with other things, and I will need to think through it. Not sure if we can model node decommission - where we have block moved from host A to host B without any other change - as not requiring an epoch update, requires analysis ....

Assuming we sort out how to get updated state, (3) looks like a reasonable approach.




> Shuffle output location refetch during shuffle migration in decommission
> ------------------------------------------------------------------------
>
>                 Key: SPARK-41953
>                 URL: https://issues.apache.org/jira/browse/SPARK-41953
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 3.3.1
>            Reporter: Zhongwei Zhu
>            Priority: Major
>
> When shuffle migration enabled during spark decommissionm, shuffle data will be migrated into live executors, then update latest location to MapOutputTracker. It has some issues:
>  # Executors only do map output location fetch in the beginning of the reduce stage, so any shuffle output location change in the middle of reduce will cause FetchFailed as reducer fetch from old location. Even stage retries could solve this, this still cause lots of resource waste as all shuffle read and compute happened before FetchFailed partition will be wasted.
>  # During stage retries, less running tasks cause more executors to be decommissioned and shuffle data location keep changing. In the worst case, stage could need lots of retries, further breaking SLA.
> So I propose to support refetch map output location during reduce phase if shuffle migration is enabled and FetchFailed is caused by a decommissioned dead executor. The detailed steps as below:
>  # When `BlockTransferService` fetch blocks failed from a decommissioned dead executor, ExecutorDeadException(isDecommission as true) will be thrown.
>  # Make MapOutputTracker support fetch latest output without epoch provided.
>  # `ShuffleBlockFetcherIterator` will refetch latest output from MapOutputTrackMaster. For all the shuffle blocks on this decommissioned, there should be a new location on another executor. If not, throw exception as current. If yes, create new local and remote requests to fetch these migrated shuffle blocks. The flow will be similar as failback fetch when push merged fetch failed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org