You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Junfan Zhang (Jira)" <ji...@apache.org> on 2022/05/05 02:56:00 UTC

[jira] [Updated] (SPARK-30873) Handling Node Decommissioning for Yarn cluster manager in Spark

     [ https://issues.apache.org/jira/browse/SPARK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Junfan Zhang updated SPARK-30873:
---------------------------------
    Summary: Handling Node Decommissioning for Yarn cluster manager in Spark  (was: Handling Node Decommissioning for Yarn cluster manAger in Spark)

> Handling Node Decommissioning for Yarn cluster manager in Spark
> ---------------------------------------------------------------
>
>                 Key: SPARK-30873
>                 URL: https://issues.apache.org/jira/browse/SPARK-30873
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core, YARN
>    Affects Versions: 3.1.0
>            Reporter: Saurabh Chawla
>            Priority: Major
>
> In many public cloud environments, the node loss (in case of AWS SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed activity. 
> The cloud provider intimates the cluster manager about the possible loss of node ahead of time. Few examples is listed here:
> a) Spot loss in AWS(2 min before event)
> b) GCP Pre-emptible VM loss (30 second before event)
> c) AWS Spot block loss with info on termination time (generally few tens of minutes before decommission as configured in Yarn)
> This JIRA tries to make spark leverage the knowledge of the node loss in future, and tries to adjust the scheduling of tasks to minimise the impact on the application. 
> It is well known that when a host is lost, the executors, its running tasks, their caches and also Shuffle data is lost. This could result in wastage of compute and other resources.
> The focus here is to build a framework for YARN, that can be extended for other cluster managers to handle such scenario.
> The framework must handle one or more of the following:-
> 1) Prevent new tasks from starting on any executors on decommissioning Nodes. 
> 2) Decide to kill the running tasks so that they can be restarted elsewhere (assuming they will not complete within the deadline) OR we can allow them to continue hoping they will finish within deadline.
> 3) Clear the shuffle data entry from MapOutputTracker of decommission node hostname to prevent the shuffle fetchfailed exception.The most significant advantage of unregistering shuffle outputs when Spark schedules the first re-attempt to compute the missing blocks, it notices all of the missing blocks from decommissioned nodes and recovers in only one attempt. This speeds up the recovery process significantly over the scheduled Spark implementation, where stages might be rescheduled multiple times to recompute missing shuffles from all nodes, and prevent jobs from being stuck for hours failing and recomputing.
> 4) Prevent the stage to abort due to the fetchfailed exception in case of decommissioning of node. In Spark there is number of consecutive stage attempts allowed before a stage is aborted.This is controlled by the config spark.stage.maxConsecutiveAttempts. Not accounting fetch fails due decommissioning of nodes towards stage failure improves the reliability of the system.
> Main components of change
> 1) Get the ClusterInfo update from the Resource Manager -> Application Master -> Spark Driver.
> 2) DecommissionTracker, resides inside driver, tracks all the decommissioned nodes and take necessary action and state transition.
> 3) Based on the decommission node list add hooks at code to achieve
>  a) No new task on executor
>  b) Remove shuffle data mapping info for the node to be decommissioned from the mapOutputTracker
>  c) Do not count fetchFailure from decommissioned towards stage failure
> On the receiving info that node is to be decommissioned, the below action needs to be performed by DecommissionTracker on driver:
>  * Add the entry of Nodes in DecommissionTracker with termination time and node state as "DECOMMISSIONING".
>  * Stop assigning any new tasks on executors on the nodes which are candidate for decommission. This makes sure slowly as the tasks finish the usage of this node would die down.
>  * Kill all the executors for the decommissioning nodes after configurable period of time, say "spark.graceful.decommission.executor.leasetimePct". This killing ensures two things. Firstly, the task failure will be attributed in job failure count. Second, avoid generation on more shuffle data on the node that will eventually be lost. The node state is set to "EXECUTOR_DECOMMISSIONED". 
>  * Mark Shuffle data on the node as unavailable after "spark.qubole.graceful.decommission.shuffedata.leasetimePct" time. This will ensure that recomputation of missing shuffle partition is done early, rather than reducers failing with a time-consuming FetchFailure. The node state is set to "SHUFFLE_DECOMMISSIONED". 
>  * Mark Node as Terminated after the termination time. Now the state of the node is "TERMINATED".
>  * Remove the node entry from Decommission Tracker if the same host name is reused.(This is not uncommon in many public cloud environments).
> This is the life cycle of the nodes which is decommissioned
> DECOMMISSIONING -> EXECUTOR_DECOMMISSIONED -> SHUFFLEDATA_DECOMMISSIONED -> TERMINATED.
> *Why do we exit the executors decommission before the shuffle decommission service? *- There are 2 reasons why we are exiting the executors before the shuffle service
> a) As per the current logic whenever we received the node decommissioning we stop assigning the new task to the executor running on that node. We give some time to the task already running on that executor to complete before killing the executors. If we keep the executors running till the end, there are chances of generating more shuffle data which will be eventually lost, triggering a recompute in future. This approach minimizes the recomputation of the shuffle data and maximise the usage of that shuffle data on the node by increasing the availability of it till the end.
> b) We want to keep the shuffle data till the time where the node is about to be lost, So if there are some task that is dependent on that shuffle data can complete and we don't have to recompute the shuffle data if none of the task required the shuffle data.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org