You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Juan Rodríguez Hortalá (JIRA)" <ji...@apache.org> on 2017/10/23 23:41:00 UTC

[jira] [Updated] (SPARK-22339) Push epoch updates to executors on fetch failure to avoid fetch retries for missing executors

     [ https://issues.apache.org/jira/browse/SPARK-22339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Juan Rodríguez Hortalá updated SPARK-22339:
-------------------------------------------
    Attachment: push_epoch_update-WIP.diff

> Push epoch updates to executors on fetch failure to avoid fetch retries for missing executors
> ---------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22339
>                 URL: https://issues.apache.org/jira/browse/SPARK-22339
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle, Spark Core
>    Affects Versions: 2.2.0
>            Reporter: Juan Rodríguez Hortalá
>         Attachments: push_epoch_update-WIP.diff
>
>
> When a task finishes with error due to a fetch error, then DAGScheduler unregisters the shuffle blocks hosted by the serving executor (or even all the executors in the failing host, with external shuffle and spark.files.fetchFailure.unRegisterOutputOnHost enabled) in the shuffle block directory stored by MapOutputTracker, that then increments its epoch as a result. This event is only signaled to the other executors when a new task with a new epoch starts in each executor. This means that other executors reading from the failed executors will retry fetching shuffle blocks from them, even though the driver already knows those executors are lost and those blocks are now unavailable at those locations. This impacts job runtime, specially for long shuffles and executor failures at the end of a stage, when the only pending tasks are shuffle reads. 
> This could be improved by pushing the epoch update to the executors without having to wait for a new task. In the attached patch I sketch a possible solution that sends the updated epoch from the driver to the executors by piggybacking on the executor heartbeat response. ShuffleBlockFetcherIterator, RetryingBlockFetcher and BlockFetchingListener are modified so blocks locations are checked on each fetch retry. This doesn't introduce additional traffic, as MapOutputTrackerWorker.mapStatuses is shared by all tasks running on the same Executor, and the lookup of the new shuffle blocks directory was going to happen anyway when the new epoch is detected during the start of the next task. 
> I would like to know the opinion of the community on this approach. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org