You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dongwook Kwon (Jira)" <ji...@apache.org> on 2020/10/30 22:10:00 UTC

[jira] [Commented] (SPARK-29683) Job failed due to executor failures all available nodes are blacklisted

    [ https://issues.apache.org/jira/browse/SPARK-29683?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17223929#comment-17223929 ] 

Dongwook Kwon commented on SPARK-29683:
---------------------------------------

I agree with Genmao, these logics that added by SPARK-16630 is too strong condition to make application fail.

 
{code:java}
def isAllNodeBlacklisted: Boolean = currentBlacklistedYarnNodes.size >= numClusterNodes
val allBlacklistedNodes = excludeNodes ++ schedulerBlacklist ++ allocatorBlacklist.keySet
{code}
 

I think the above logic would work only for partial failure or intermittent issue that numClusterNodes isn't changed, in case of a scheduler or allocator failure become permanent failure that ResourceManager has to remove from its pool which numClusterNodes is changed, the above logic could fail.

e.g) let's say a cluster has 2 NodeManagers(numClusterNodes = 2), and one NodeManager(N1) has the some issues that cause scheduling failures which ends up increasing schedulerBlacklist.size to 1, and later N1 can't recover from ResourceManager's perspective due to a hardware failure or decommissioned by operator or any other ways, in this case numClusterNodes becomes 1 which makes isAllNodeBlacklisted true, even if there is still 1 NodeManager available and "spark.yarn.blacklist.executor.launch.blacklisting.enabled" set to false

Particularly in cloud environment, resizing of cluster happens all the times, for long-running spark application with many resize operations of cluster, schedulerBlacklist.size could keep increasing while numClusterNodes keep fluctuated, in addition even if currentBlacklistedYarnNodes.size >= numClusterNodes is true case, there could be new nodes would be added quickly. 

I found [e70df2cea46f71461d8d401a420e946f999862c1|https://github.com/apache/spark/commit/e70df2cea46f71461d8d401a420e946f999862c1] was added to handle the case of numClusterNode = 0.

However for other cases as mentioned in this JIRA, I think just removing the following part from [ApplicationMaster|https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala#L535-L538] would more make sense, because isAllNodeBlackListed doesn't necessarily mean running application needs to fail.

 
{code:java}
} else if (allocator.isAllNodeBlacklisted) {
 finish(FinalApplicationStatus.FAILED,
 ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
 "Due to executor failures all available nodes are blacklisted")
{code}
 

Or at least the above condition should apply as optional with "spark.yarn.blacklist.executor.launch.blacklisting.enabled" or some new configuration because SPARK-16630 added as optional but the above logic impact regardless of any configuration.

I wonder other's opinion about this.

 

> Job failed due to executor failures all available nodes are blacklisted
> -----------------------------------------------------------------------
>
>                 Key: SPARK-29683
>                 URL: https://issues.apache.org/jira/browse/SPARK-29683
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core, YARN
>    Affects Versions: 3.0.0
>            Reporter: Genmao Yu
>            Priority: Major
>
> My streaming job will fail *due to executor failures all available nodes are blacklisted*. This exception is thrown only when all node is blacklisted:
> {code:java}
> def isAllNodeBlacklisted: Boolean = currentBlacklistedYarnNodes.size >= numClusterNodes
> val allBlacklistedNodes = excludeNodes ++ schedulerBlacklist ++ allocatorBlacklist.keySet
> {code}
> After diving into the code, I found some critical conditions not be handled properly:
>  - unchecked `excludeNodes`: it comes from user config. If not set properly, it may lead to "currentBlacklistedYarnNodes.size >= numClusterNodes". For example, we may set some nodes not in Yarn cluster.
> {code:java}
> excludeNodes = (invalid1, invalid2, invalid3)
> clusterNodes = (valid1, valid2)
> {code}
>  - `numClusterNodes` may equals 0: When HA Yarn failover, it will take some time for all NodeManagers to register ResourceManager again. In this case, `numClusterNode` may equals 0 or some other number, and Spark driver failed.
>  - too strong condition check: Spark driver will fail as long as "currentBlacklistedYarnNodes.size >= numClusterNodes". This condition should not indicate a unrecovered fatal. For example, there are some NodeManagers restarting. So we can give some waiting time before job failed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org