You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Zhongwei Zhu (Jira)" <ji...@apache.org> on 2023/01/10 01:46:00 UTC

[jira] [Updated] (SPARK-41953) Shuffle output location refetch during shuffle migration in decommission

     [ https://issues.apache.org/jira/browse/SPARK-41953?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Zhongwei Zhu updated SPARK-41953:
---------------------------------
    Description: 
When shuffle migration enabled during spark decommissionm, shuffle data will be migrated into live executors, then update latest location to MapOutputTracker. It has some issues:
 # Executors only do map output location fetch in the beginning of the reduce stage, so any shuffle output location change in the middle of reduce will cause FetchFailed as reducer fetch from old location. Even stage retries could solve this, this still cause lots of resource waste as all shuffle read and compute happened before FetchFailed partition will be wasted.
 # During stage retries, less running tasks cause more executors to be decommissioned and shuffle data location keep changing. In the worst case, stage could need lots of retries, further breaking SLA.

So I propose to support refetch map output location during reduce phase if shuffle migration is enabled and FetchFailed is caused by a decommissioned dead executor. The detailed steps as below:
 # When `BlockTransferService` fetch blocks failed from a decommissioned dead executor, ExecutorDeadException(isDecommission as true) will be thrown.
 # Make MapOutputTracker support fetch latest output without epoch provided.
 # `ShuffleBlockFetcherIterator` will refetch latest output from MapOutputTrackMaster. For all the shuffle blocks on this decommissioned, there should be a new location on another executor. If not, throw exception as current. If yes, create new local and remote requests to fetch these migrated shuffle blocks. The flow will be similar as failback fetch when push merged fetch failed.

  was:
When shuffle migration enabled during spark decommissionm, shuffle data will be migrated into live executors, then update latest location to MapOutputTracker. It has some issues:
 # Executors only do map output location fetch in the beginning of the reduce stage, so any shuffle output location change in the middle of reduce will cause FetchFailed as reducer fetch from old location. Even stage retries could solve this, this still cause lots of resource waste as all shuffle read and compute happened before FetchFailed partition will be wasted.
 # During stage retries, less running tasks cause more executors to be decommissioned and shuffle data location keep changing. In the worst case, stage could need lots of retries, further breaking SLA.

So I propose to support refetch map output location during reduce phase if shuffle migration is enabled and FetchFailed is caused by a decommissioned dead executor. The detailed steps as


> Shuffle output location refetch during shuffle migration in decommission
> ------------------------------------------------------------------------
>
>                 Key: SPARK-41953
>                 URL: https://issues.apache.org/jira/browse/SPARK-41953
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 3.3.1
>            Reporter: Zhongwei Zhu
>            Priority: Major
>
> When shuffle migration enabled during spark decommissionm, shuffle data will be migrated into live executors, then update latest location to MapOutputTracker. It has some issues:
>  # Executors only do map output location fetch in the beginning of the reduce stage, so any shuffle output location change in the middle of reduce will cause FetchFailed as reducer fetch from old location. Even stage retries could solve this, this still cause lots of resource waste as all shuffle read and compute happened before FetchFailed partition will be wasted.
>  # During stage retries, less running tasks cause more executors to be decommissioned and shuffle data location keep changing. In the worst case, stage could need lots of retries, further breaking SLA.
> So I propose to support refetch map output location during reduce phase if shuffle migration is enabled and FetchFailed is caused by a decommissioned dead executor. The detailed steps as below:
>  # When `BlockTransferService` fetch blocks failed from a decommissioned dead executor, ExecutorDeadException(isDecommission as true) will be thrown.
>  # Make MapOutputTracker support fetch latest output without epoch provided.
>  # `ShuffleBlockFetcherIterator` will refetch latest output from MapOutputTrackMaster. For all the shuffle blocks on this decommissioned, there should be a new location on another executor. If not, throw exception as current. If yes, create new local and remote requests to fetch these migrated shuffle blocks. The flow will be similar as failback fetch when push merged fetch failed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org