You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Thomas Graves (JIRA)" <ji...@apache.org> on 2019/05/16 18:07:00 UTC

[jira] [Commented] (SPARK-27736) Improve handling of FetchFailures caused by ExternalShuffleService losing track of executor registrations

    [ https://issues.apache.org/jira/browse/SPARK-27736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16841593#comment-16841593 ] 

Thomas Graves commented on SPARK-27736:
---------------------------------------

Yeah we always ran yarn with node manager recover on, but that doesn't help standalone mode unless you implement something similar.  But either way I think documenting it on yarn is a good idea.

We used to see transient fetch failures all the time, because of temporary spikes in disk usage, so I would be hesitant to turn on  spark.files.fetchFailure.unRegisterOutputOnHost by default, but on the other hand users could turn it back off too, so it depends on what people think is most common.  

I don't think you can assume the death of shuffle service (NM on yarn) implies death of executor. We have seen Nodemanagers goes down with OOM and executor stays up. Without the NM there, there isn't really anything to clean up the containers on it.  Now you will obviously fetch fail from that node if it does go down.

Your last option seems like the best of those but like you mention could get a bit ugly with the String matching.

The other thing you can do is start tracking those fetch failures and have the driver make a more informed decision on that. This is work we had started to do at my previous employer but never had time to finish it. Its a much bigger change but really what we should be doing. It would allow us to make better decisions about black listing and see was it the map or reduce node that has issues, etc.

 

> Improve handling of FetchFailures caused by ExternalShuffleService losing track of executor registrations
> ---------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-27736
>                 URL: https://issues.apache.org/jira/browse/SPARK-27736
>             Project: Spark
>          Issue Type: Bug
>          Components: Shuffle
>    Affects Versions: 2.4.0
>            Reporter: Josh Rosen
>            Priority: Minor
>
> This ticket describes a fault-tolerance edge-case which can cause Spark jobs to fail if a single external shuffle service process reboots and fails to recover the list of registered executors (something which can happen when using YARN if NodeManager recovery is disabled) _and_ the Spark job has a large number of executors per host.
> I believe this problem can be worked around today via a change of configurations, but I'm filing this issue to (a) better document this problem, and (b) propose either a change of default configurations or additional DAGScheduler logic to better handle this failure mode.
> h2. Problem description
> The external shuffle service process is _mostly_ stateless except for a map tracking the set of registered applications and executors.
> When processing a shuffle fetch request, the shuffle services first checks whether the requested block ID's executor is registered; if it's not registered then the shuffle service throws an exception like 
> {code:java}
> java.lang.RuntimeException: Executor is not registered (appId=application_1557557221330_6891, execId=428){code}
> and this exception becomes a {{FetchFailed}} error in the executor requesting the shuffle block.
> In normal operation this error should not occur because executors shouldn't be mis-routing shuffle fetch requests. However, this _can_ happen if the shuffle service crashes and restarts, causing it to lose its in-memory executor registration state. With YARN this state can be recovered from disk if YARN NodeManager recovery is enabled (using the mechanism added in SPARK-9439), but I don't believe that we perform state recovery in Standalone and Mesos modes (see SPARK-24223).
> If state cannot be recovered then map outputs cannot be served (even though the files probably still exist on disk). In theory, this shouldn't cause Spark jobs to fail because we can always redundantly recompute lost / unfetchable map outputs.
> However, in practice this can cause total job failures in deployments where the node with the failed shuffle service was running a large number of executors: by default, the DAGScheduler unregisters map outputs _only from individual executor whose shuffle blocks could not be fetched_ (see [code|https://github.com/apache/spark/blame/bfb3ffe9b33a403a1f3b6f5407d34a477ce62c85/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1643]), so it can take several rounds of failed stage attempts to fail and clear output from all executors on the faulty host. If the number of executors on a host is greater than the stage retry limit then this can exhaust stage retry attempts and cause job failures.
> This "multiple rounds of recomputation to discover all failed executors on a host" problem was addressed by SPARK-19753, which added a {{spark.files.fetchFailure.unRegisterOutputOnHost}} configuration which promotes executor fetch failures into host-wide fetch failures (clearing output from all neighboring executors upon a single failure). However, that configuration is {{false}} by default.
> h2. Potential solutions
> I have a few ideas about how we can improve this situation:
>  - Update the [YARN external shuffle service documentation|https://spark.apache.org/docs/latest/running-on-yarn.html#configuring-the-external-shuffle-service] to recommend enabling node manager recovery.
>  - Consider defaulting {{spark.files.fetchFailure.unRegisterOutputOnHost}} to {{true}}. This would improve out-of-the-box resiliency for large clusters. The trade-off here is a reduction of efficiency in case there are transient "false positive" fetch failures, but I suspect this case may be unlikely in practice (so the change of default could be an acceptable trade-off). See [prior discussion on GitHub|https://github.com/apache/spark/pull/18150#discussion_r119736751].
>  - Modify DAGScheduler to add special-case handling for "Executor is not registered" exceptions that trigger FetchFailures: if we see this exception then it implies that the shuffle service failed to recover state, implying that all of its prior outputs are effectively unavailable. In this case, it _might_ be safe to unregister all host outputs irrespective of whether the {{unRegisterOutputOnHost}} flag is set.
>  -- This might require us to string-match on exceptions (so we can be backwards-compatible with old shuffle services, freeing users from needing to upgrade / restart NMs to pick up this fix).
>  -- I suppose there's the potential for race conditions where the shuffle service restarts and produces _new_ map outputs from freshly-registered executors, only for us to turn around and unnecessarily clear those outputs as part of the cleanup of the pre-shuffle-service-restart outputs. If we assume shuffle service and executor deaths are coupled (i.e. that death of the shuffle service process implies death of all executors, something which I believe is true of both YARN NM and Standalone Worker death) then we could be a bit more precise and invalidate outputs from all _dead_ executors on that host.
> I'm open to other suggestions, too.
> /cc [~imranr] [~sitalkedia@gmail.com] [~tgraves] [~vanzin] as FYI (since I think you've all worked on relevant parts of this code).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org