You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Harry Brundage (JIRA)" <ji...@apache.org> on 2014/12/04 01:07:12 UTC

[jira] [Updated] (SPARK-4732) All application progress on the standalone scheduler can be halted by one systematically faulty node

     [ https://issues.apache.org/jira/browse/SPARK-4732?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Harry Brundage updated SPARK-4732:
----------------------------------
    Description: 
We've experienced several cluster wide outages caused by unexpected system wide faults on one of our spark workers if that worker is failing systematically. By systematically, I mean that every executor launched by that worker will definitely fail due to some reason out of Spark's control like the log directory disk being completely out of space, or a permissions error for a file that's always read during executor launch. We screw up all the time on our team and cause stuff like this to happen, but because of the way the standalone scheduler allocates resources, our cluster doesn't recover gracefully from these failures. 

When there are more tasks to do than executors, I am pretty sure the way the scheduler works is that it just waits for more resource offers and then allocates tasks from the queue to those resources. If an executor dies immediately after starting, the worker monitor process will notice that it's dead. The master will allocate that worker's now free cores/memory to a currently running application that is below its spark.cores.max, which in our case I've observed as usually the app that just had the executor die. A new executor gets spawned on the same worker that the last one just died on, gets allocated that one task that failed, and then the whole process fails again for the same systematic reason, and lather rinse repeat. This happens 10 times or whatever the max task failure count is, and then the whole app is deemed a failure by the driver and shut down completely.

For us, we usually run roughly as many cores as we have hadoop nodes. We also usually have many more input splits than we have tasks, which means the locality of the first few tasks which I believe determines where our executors run is well spread out over the cluster, and often covers 90-100% of nodes. This means the likelihood of any application getting an executor scheduled any broken node is quite high. So, in my experience, after an application goes through the above mentioned process and dies, the next application to start or not be at it's requested max capacity gets an executor scheduled on the broken node, and is promptly taken down as well. This happens over and over as well, to the point where none of our spark jobs are making any progress because of one tiny permissions mistake on one node.

Now, I totally understand this is usually an "error between keyboard and screen" kind of situation where it is the responsibility of the people deploying spark to ensure it is deployed correctly. The systematic issues we've encountered are almost always of this nature: permissions errors, disk full errors, one node not getting a new spark jar from a configuration error, configurations being out of sync, etc. That said, disks are going to fail or half fail, fill up, node rot is going to ruin configurations, etc etc etc, and as hadoop clusters scale in size this becomes more and more likely, so I think its reasonable to ask that Spark be resilient to this kind of failure and keep on truckin'. 

I think a good simple fix would be to have applications, or the master, blacklist workers (not executors) at a failure count lower than the task failure count. This would also serve as a belt and suspenders fix for SPARK-4498.
 If the scheduler stopped trying to schedule on nodes that fail a lot, we could still make progress. These blacklist events are really important and I think would need to be well logged and surfaced in the UI, but I'd rather log and carry on than fail hard. I think the tradeoff here is that you risk blacklisting ever worker as well if there is something systematically wrong with communication or whatever else I can't imagine.

Please let me know if I've misunderstood how the scheduler works or you need more information or anything like that and I'll be happy to provide. 

  was:
We've experienced several cluster wide outages caused by unexpected system wide faults on one of our spark workers if that worker is failing systematically. By systematically, I mean that every executor launched by that worker will definitely fail due to some reason out of Spark's control like the log directory disk being completely out of space, or a permissions error for a file that's always read during executor launch. We screw up all the time on our team and cause stuff like this to happen, but because of the way the standalone scheduler allocates resources, our cluster doesn't recover gracefully from these failures. 

Correct me if I am wrong but when there are more tasks to do than executors, I am pretty sure the way the scheduler works is that it just waits for more resource offers and then allocates tasks from the queue to those resources. If an executor dies immediately after starting, the worker monitor process will notice that it's dead. The master will allocate that worker's now free cores/memory to a currently running application that is below its spark.cores.max, which in our case I've observed as usually the app that just had the executor die. A new executor gets spawned on the same worker that the last one just died on, gets allocated that one task that failed, and then the whole process fails again for the same systematic reason, and lather rinse repeat. This happens 10 times or whatever the max task failure count is, and then the whole app is deemed a failure by the driver and shut down completely.

For us, we usually run roughly as many cores as we have hadoop nodes. We also usually have many more input splits than we have tasks, which means the locality of the first few tasks which I believe determines where our executors run is well spread out over the cluster, and often covers 90-100% of nodes. This means the likelihood of any application getting an executor scheduled any broken node is quite high. So, in my experience, after an application goes through the above mentioned process and dies, the next application to start or not be at it's requested max capacity gets an executor scheduled on the broken node, and is promptly taken down as well. This happens over and over as well, to the point where none of our spark jobs are making any progress because of one tiny permissions mistake on one node.

Now, I totally understand this is usually an "error between keyboard and screen" kind of situation where it is the responsibility of the people deploying spark to ensure it is deployed correctly. The systematic issues we've encountered are almost always of this nature: permissions errors, disk full errors, one node not getting a new spark jar from a configuration error, configurations being out of sync, etc. That said, disks are going to fail or half fail, fill up, node rot is going to ruin configurations, etc etc etc, and as hadoop clusters scale in size this becomes more and more likely, so I think its reasonable to ask that Spark be resilient to this kind of failure and keep on truckin'. 

I think a good simple fix would be to have applications, or the master, blacklist workers (not executors) at a failure count lower than the task failure count. This would also serve as a belt and suspenders fix for SPARK-4498.
 If the scheduler stopped trying to schedule on nodes that fail a lot, we could still make progress. These blacklist events are really important and I think would need to be well logged and surfaced in the UI, but I'd rather log and carry on than fail hard. I think the tradeoff here is that you risk blacklisting ever worker as well if there is something systematically wrong with communication or whatever else I can't imagine.

Please let me know if I've misunderstood how the scheduler works or you need more information or anything like that and I'll be happy to provide. 


> All application progress on the standalone scheduler can be halted by one systematically faulty node
> ----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-4732
>                 URL: https://issues.apache.org/jira/browse/SPARK-4732
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.1.0, 1.2.0
>         Environment:  - Spark Standalone scheduler
>            Reporter: Harry Brundage
>
> We've experienced several cluster wide outages caused by unexpected system wide faults on one of our spark workers if that worker is failing systematically. By systematically, I mean that every executor launched by that worker will definitely fail due to some reason out of Spark's control like the log directory disk being completely out of space, or a permissions error for a file that's always read during executor launch. We screw up all the time on our team and cause stuff like this to happen, but because of the way the standalone scheduler allocates resources, our cluster doesn't recover gracefully from these failures. 
> When there are more tasks to do than executors, I am pretty sure the way the scheduler works is that it just waits for more resource offers and then allocates tasks from the queue to those resources. If an executor dies immediately after starting, the worker monitor process will notice that it's dead. The master will allocate that worker's now free cores/memory to a currently running application that is below its spark.cores.max, which in our case I've observed as usually the app that just had the executor die. A new executor gets spawned on the same worker that the last one just died on, gets allocated that one task that failed, and then the whole process fails again for the same systematic reason, and lather rinse repeat. This happens 10 times or whatever the max task failure count is, and then the whole app is deemed a failure by the driver and shut down completely.
> For us, we usually run roughly as many cores as we have hadoop nodes. We also usually have many more input splits than we have tasks, which means the locality of the first few tasks which I believe determines where our executors run is well spread out over the cluster, and often covers 90-100% of nodes. This means the likelihood of any application getting an executor scheduled any broken node is quite high. So, in my experience, after an application goes through the above mentioned process and dies, the next application to start or not be at it's requested max capacity gets an executor scheduled on the broken node, and is promptly taken down as well. This happens over and over as well, to the point where none of our spark jobs are making any progress because of one tiny permissions mistake on one node.
> Now, I totally understand this is usually an "error between keyboard and screen" kind of situation where it is the responsibility of the people deploying spark to ensure it is deployed correctly. The systematic issues we've encountered are almost always of this nature: permissions errors, disk full errors, one node not getting a new spark jar from a configuration error, configurations being out of sync, etc. That said, disks are going to fail or half fail, fill up, node rot is going to ruin configurations, etc etc etc, and as hadoop clusters scale in size this becomes more and more likely, so I think its reasonable to ask that Spark be resilient to this kind of failure and keep on truckin'. 
> I think a good simple fix would be to have applications, or the master, blacklist workers (not executors) at a failure count lower than the task failure count. This would also serve as a belt and suspenders fix for SPARK-4498.
>  If the scheduler stopped trying to schedule on nodes that fail a lot, we could still make progress. These blacklist events are really important and I think would need to be well logged and surfaced in the UI, but I'd rather log and carry on than fail hard. I think the tradeoff here is that you risk blacklisting ever worker as well if there is something systematically wrong with communication or whatever else I can't imagine.
> Please let me know if I've misunderstood how the scheduler works or you need more information or anything like that and I'll be happy to provide. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org