You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Ankur Gupta <an...@cloudera.com.INVALID> on 2019/04/01 16:04:34 UTC

Re: [DISCUSS] Enable blacklisting feature by default in 3.0

Hi Chris,

Thanks for sending over the example. As far as I can understand, it seems
that this would not have been a problem if
"spark.blacklist.application.maxFailedTasksPerExecutor" was set to a higher
threshold, as mentioned in my previous email.

Though, with 8/7 executors and 2 failedTasksPerExecutor, if the application
runs out of executors, that would imply at least 14 task failures in a
short period of time. So, I am not sure if the application should still
continue to run or fail. If this was not a transient issue, maybe failing
was the correct outcome, as it saves lot of unnecessary computation and
also alerts admins to look for transient/permanent hardware failures.

Please let me know if you think, we should enable blacklisting feature by
default with the higher threshold.

Thanks,
Ankur

On Fri, Mar 29, 2019 at 3:23 PM Chris Stevens <ch...@databricks.com>
wrote:

> Hey All,
>
> My initial reply got lost, because I wasn't on the dev list. Hopefully
> this goes through.
>
> Back story for my experiments: customer was hitting network errors due to
> cloud infrastructure problems. Basically, executor X couldn't fetch from Y.
> The NIC backing the VM for executor Y was swallowing packets. I wanted to
> blacklist node Y.
>
> What I learned:
>
> 1. `spark.blacklist.application.fetchFailure.enabled` requires
> `spark.blacklist.enabled` to also be enabled (BlacklistTracker isn't
> created
> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L948> without
> the latter). This was a problem because the defaults for
> `spark.blacklist.[task|stage|application].*` are aggressive and don't even
> apply to fetch failures. Those are always treated as non-transient. It
> would be nice to have fetch blacklisting without regular blacklisting.
>
> 2. Due to the conf coupling in #1 and transient cloud storage errors in
> the job (FileScanRDD was failing due to corrupted files), I had to set the
> `max*PerExecutor` and `max*PerNode` to really high values (i.e. 1000).
> Without these high settings, the customer was running out of nodes on the
> cluster (as we don't have blacklisting enabled by default, we haven't
> hooked it up to any sort of dynamic cloud VM re-provisioning - something
> like `killBlacklistedNodes`). Why? The same transient FileScanRDD failure
> hit over multiple stages, so even though executors were aggressively
> removed within one
> stage, `spark.blacklist.application.maxFailedTasksPerExecutor = 2` was
> reached. The stages were succeeding because the FileScanRDD attempts on
> other executors succeeded. As such, the 8 node cluster ran out of executors
> after 3 stages. I did not have `spark.blacklist.killBlacklistedExecutors`.
> If I did, then `spark.blacklist.application.maxFailedExecutorsPerNode`
> would have kicked in and the job might have failed after 4-6 stages,
> depending on how it played out. (FWIW, this was running one executor per
> node).
>
> -Chris
>
> On Fri, Mar 29, 2019 at 1:48 PM Ankur Gupta <an...@cloudera.com>
> wrote:
>
>> Thanks Reynold! That is certainly useful to know.
>>
>> @Chris Will it be possible for you to send out those details if you still
>> have them or better create a JIRA, so someone can work on those
>> improvements. If there is already a JIRA, can you please provide a link to
>> the same.
>>
>> Additionally, if the concern is with the aggressiveness of the
>> blacklisting, then we can enable blacklisting feature by default with
>> higher thresholds for failures. Below is an alternate set of defaults that
>> were also proposed in the design document for max cluster utilization:
>>
>>    1. spark.blacklist.task.maxTaskAttemptsPerExecutor = 2
>>    2. spark.blacklist.task.maxTaskAttemptsPerNode = 2
>>    3. spark.blacklist.stage.maxFailedTasksPerExecutor = 5
>>    4. spark.blacklist.stage.maxFailedExecutorsPerNode = 4
>>    5. spark.blacklist.application.maxFailedTasksPerExecutor = 5
>>    6. spark.blacklist.application.maxFailedExecutorsPerNode = 4
>>    7. spark.blacklist.timeout = 5 mins
>>
>>
>>
>> On Fri, Mar 29, 2019 at 11:18 AM Reynold Xin <rx...@databricks.com> wrote:
>>
>>> We tried enabling blacklisting for some customers and in the cloud, very
>>> quickly they end up having 0 executors due to various transient errors. So
>>> unfortunately I think the current implementation is terrible for cloud
>>> deployments, and shouldn't be on by default. The heart of the issue is that
>>> the current implementation is not great at dealing with transient errors vs
>>> catastrophic errors.
>>>
>>> +Chris who was involved with those tests.
>>>
>>>
>>>
>>> On Thu, Mar 28, 2019 at 3:32 PM, Ankur Gupta <
>>> ankur.gupta@cloudera.com.invalid> wrote:
>>>
>>>> Hi all,
>>>>
>>>> This is a follow-on to my PR:
>>>> https://github.com/apache/spark/pull/24208, where I aimed to enable
>>>> blacklisting for fetch failure by default. From the comments, there is
>>>> interest in the community to enable overall blacklisting feature by
>>>> default. I have listed down 3 different things that we can do and would
>>>> like to gather feedback and see if anyone has objections with regards to
>>>> this. Otherwise, I will just create a PR for the same.
>>>>
>>>> 1. *Enable blacklisting feature by default*. The blacklisting feature
>>>> was added as part of SPARK-8425 and is available since 2.2.0. This feature
>>>> was deemed experimental and was disabled by default. The feature blacklists
>>>> an executor/node from running a particular task, any task in a particular
>>>> stage or all tasks in application based on number of failures. There are
>>>> various configurations available which control those thresholds.
>>>> Additionally, the executor/node is only blacklisted for a configurable time
>>>> period. The idea is to enable blacklisting feature with existing defaults,
>>>> which are following:
>>>>
>>>>    1. spark.blacklist.task.maxTaskAttemptsPerExecutor = 1
>>>>    2. spark.blacklist.task.maxTaskAttemptsPerNode = 2
>>>>    3. spark.blacklist.stage.maxFailedTasksPerExecutor = 2
>>>>    4. spark.blacklist.stage.maxFailedExecutorsPerNode = 2
>>>>    5. spark.blacklist.application.maxFailedTasksPerExecutor = 2
>>>>    6. spark.blacklist.application.maxFailedExecutorsPerNode = 2
>>>>    7. spark.blacklist.timeout = 1 hour
>>>>
>>>> 2. *Kill blacklisted executors/nodes by default*. This feature was
>>>> added as part of SPARK-16554 and is available since 2.2.0. This is a
>>>> follow-on feature to blacklisting, such that if an executor/node is
>>>> blacklisted for the application, then it also terminates all running tasks
>>>> on that executor for faster failure recovery.
>>>>
>>>> 3. *Remove legacy blacklisting timeout config*
>>>> : spark.scheduler.executorTaskBlacklistTime
>>>>
>>>> Thanks,
>>>> Ankur
>>>>
>>>
>>>

Re: [DISCUSS] Enable blacklisting feature by default in 3.0

Posted by Ankur Gupta <an...@cloudera.com.INVALID>.
Thanks for your thoughts Chris! Please find my response below:

- Rather than a fixed timeout, could we do some sort of exponential
backoff? Start with a 10 or 20 second blacklist and increase from there?
The nodes with catastrophic errors should quickly hit long blacklist
intervals.
- +1 I like this idea. This will have some additional costs with respect to
tracking interval for each executor/node but it will certainly be very
useful.

- Correct me if I'm wrong, but once a task fails on an executor, even if
maxTaskAttemptsPerExecutor > 1, that executor will get a failed task count
against it. It looks like "TaskSetBlacklist.updateBlacklistForFailedTask"
only adds to the executor failures. If the tasks recovers on the second
attempt on the same executor, there is no way to remove the failure. I'd
argue that if the task succeeds on a second attempt on the same executor,
then it is definitely transient and the first attempt's failure should not
count towards the executor's total stage/application failure count.
- I am not sure about this. I think the purpose of blacklisting is to find
nodes with transient failures as well and blacklist them for a short period
of time to avoid re-computation. So, it will be useful to count a failure
against an executor even if it successfully recovered from that failure
later on. And with the exponential backoff, blacklisting will be transient
in nature so it will not be a huge penalty, if that failure was truly
transient.

- W.r.t turning it on by default: Do we have a sense of how many teams are
using blacklisting today using the current default settings? It may be
worth changing the defaults for a release or two and gather feedback to
help make a call on turning it on by default. We could potentially get that
feedback now: two question survey "Have you enabled blacklisting?" and
"What settings did you use?"
- I think this email was intended for that purpose. Additionally, from the
comments on my PR: https://github.com/apache/spark/pull/24208, it seems
some teams have that enabled by default already.

On Mon, Apr 1, 2019 at 3:08 PM Chris Stevens <ch...@databricks.com>
wrote:

> Hey Ankur,
>
> I think the significant decrease in "spark.blacklist.timeout" (1 hr down
> to 5 minutes) in your updated suggestion is the key here.
>
> Looking at a few *successful* runs of the application I was debugging,
> here are the error rates when I did *not* have blacklisting enabled:
>
> Run A: 8 executors with 36 total errors over the last 25 minutes of a 1
> hour and 6 minute run.
> Run B: 8 executors with 50 total errors over the last 30 minutes of a 1
> hour run.
>
> Increasing "spark.blacklist.application.maxFailedTasksPerExecutor" to 5
> would have allowed run A (~3 failures/executor) to pass, but run B (~6
> failures/executor) would not have without the change to
> "spark.blacklist.timeout".
>
> With such a small timeout of 5 minutes, the worst you get is executors
> flipping between blacklisted and not blacklisted (e.g. fail 5 tasks quickly
> due to disk failures, wait 5 minutes, fail 5 tasks quickly, wait 5
> minutes). For catastrophic errors, this is probably OK. The executor will
> fail fast each time it comes back online and will effectively be
> blacklisted 90+% of the time. For transient errors, the executor will come
> back online and probably be fine. The only trouble you get into is if you
> run out of executors for a stage due to a high amount of transient errors,
> but you're right, perhaps that many transient errors is something worth
> failing for.
>
> In the case I was debugging with fetch failures, only the 5 minute timeout
> applies, but I don't think it would have mattered. Fetch task attempts were
> "hanging" for 30+ minutes without failing (it took that long for the netty
> channel to timeout). As such, there was no opportunity to blacklist. Even
> reducing the number of fetch retry attempts didn't help, as the first
> attempt occasionally stalled due to the underlying networking issues.
>
> A few thoughts:
> - Correct me if I'm wrong, but once a task fails on an executor, even if
> maxTaskAttemptsPerExecutor > 1, that executor will get a failed task count
> against it. It looks like "TaskSetBlacklist.updateBlacklistForFailedTask"
> only adds to the executor failures. If the tasks recovers on the second
> attempt on the same executor, there is no way to remove the failure. I'd
> argue that if the task succeeds on a second attempt on the same executor,
> then it is definitely transient and the first attempt's failure should not
> count towards the executor's total stage/application failure count.
> - Rather than a fixed timeout, could we do some sort of exponential
> backoff? Start with a 10 or 20 second blacklist and increase from there?
> The nodes with catastrophic errors should quickly hit long blacklist
> intervals.
> - W.r.t turning it on by default: Do we have a sense of how many teams are
> using blacklisting today using the current default settings? It may be
> worth changing the defaults for a release or two and gather feedback to
> help make a call on turning it on by default. We could potentially get that
> feedback now: two question survey "Have you enabled blacklisting?" and
> "What settings did you use?"
>
> -Chris
>
> On Mon, Apr 1, 2019 at 9:05 AM Ankur Gupta <an...@cloudera.com>
> wrote:
>
>> Hi Chris,
>>
>> Thanks for sending over the example. As far as I can understand, it seems
>> that this would not have been a problem if
>> "spark.blacklist.application.maxFailedTasksPerExecutor" was set to a higher
>> threshold, as mentioned in my previous email.
>>
>> Though, with 8/7 executors and 2 failedTasksPerExecutor, if the
>> application runs out of executors, that would imply at least 14 task
>> failures in a short period of time. So, I am not sure if the application
>> should still continue to run or fail. If this was not a transient issue,
>> maybe failing was the correct outcome, as it saves lot of unnecessary
>> computation and also alerts admins to look for transient/permanent hardware
>> failures.
>>
>> Please let me know if you think, we should enable blacklisting feature by
>> default with the higher threshold.
>>
>> Thanks,
>> Ankur
>>
>> On Fri, Mar 29, 2019 at 3:23 PM Chris Stevens <
>> chris.stevens@databricks.com> wrote:
>>
>>> Hey All,
>>>
>>> My initial reply got lost, because I wasn't on the dev list. Hopefully
>>> this goes through.
>>>
>>> Back story for my experiments: customer was hitting network errors due
>>> to cloud infrastructure problems. Basically, executor X couldn't fetch from
>>> Y. The NIC backing the VM for executor Y was swallowing packets. I wanted
>>> to blacklist node Y.
>>>
>>> What I learned:
>>>
>>> 1. `spark.blacklist.application.fetchFailure.enabled` requires
>>> `spark.blacklist.enabled` to also be enabled (BlacklistTracker isn't
>>> created
>>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L948> without
>>> the latter). This was a problem because the defaults for
>>> `spark.blacklist.[task|stage|application].*` are aggressive and don't even
>>> apply to fetch failures. Those are always treated as non-transient. It
>>> would be nice to have fetch blacklisting without regular blacklisting.
>>>
>>> 2. Due to the conf coupling in #1 and transient cloud storage errors in
>>> the job (FileScanRDD was failing due to corrupted files), I had to set the
>>> `max*PerExecutor` and `max*PerNode` to really high values (i.e. 1000).
>>> Without these high settings, the customer was running out of nodes on the
>>> cluster (as we don't have blacklisting enabled by default, we haven't
>>> hooked it up to any sort of dynamic cloud VM re-provisioning - something
>>> like `killBlacklistedNodes`). Why? The same transient FileScanRDD failure
>>> hit over multiple stages, so even though executors were aggressively
>>> removed within one
>>> stage, `spark.blacklist.application.maxFailedTasksPerExecutor = 2` was
>>> reached. The stages were succeeding because the FileScanRDD attempts on
>>> other executors succeeded. As such, the 8 node cluster ran out of executors
>>> after 3 stages. I did not have `spark.blacklist.killBlacklistedExecutors`.
>>> If I did, then `spark.blacklist.application.maxFailedExecutorsPerNode`
>>> would have kicked in and the job might have failed after 4-6 stages,
>>> depending on how it played out. (FWIW, this was running one executor per
>>> node).
>>>
>>> -Chris
>>>
>>> On Fri, Mar 29, 2019 at 1:48 PM Ankur Gupta <an...@cloudera.com>
>>> wrote:
>>>
>>>> Thanks Reynold! That is certainly useful to know.
>>>>
>>>> @Chris Will it be possible for you to send out those details if you
>>>> still have them or better create a JIRA, so someone can work on those
>>>> improvements. If there is already a JIRA, can you please provide a link to
>>>> the same.
>>>>
>>>> Additionally, if the concern is with the aggressiveness of the
>>>> blacklisting, then we can enable blacklisting feature by default with
>>>> higher thresholds for failures. Below is an alternate set of defaults that
>>>> were also proposed in the design document for max cluster utilization:
>>>>
>>>>    1. spark.blacklist.task.maxTaskAttemptsPerExecutor = 2
>>>>    2. spark.blacklist.task.maxTaskAttemptsPerNode = 2
>>>>    3. spark.blacklist.stage.maxFailedTasksPerExecutor = 5
>>>>    4. spark.blacklist.stage.maxFailedExecutorsPerNode = 4
>>>>    5. spark.blacklist.application.maxFailedTasksPerExecutor = 5
>>>>    6. spark.blacklist.application.maxFailedExecutorsPerNode = 4
>>>>    7. spark.blacklist.timeout = 5 mins
>>>>
>>>>
>>>>
>>>> On Fri, Mar 29, 2019 at 11:18 AM Reynold Xin <rx...@databricks.com>
>>>> wrote:
>>>>
>>>>> We tried enabling blacklisting for some customers and in the cloud,
>>>>> very quickly they end up having 0 executors due to various transient
>>>>> errors. So unfortunately I think the current implementation is terrible for
>>>>> cloud deployments, and shouldn't be on by default. The heart of the issue
>>>>> is that the current implementation is not great at dealing with transient
>>>>> errors vs catastrophic errors.
>>>>>
>>>>> +Chris who was involved with those tests.
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Mar 28, 2019 at 3:32 PM, Ankur Gupta <
>>>>> ankur.gupta@cloudera.com.invalid> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> This is a follow-on to my PR:
>>>>>> https://github.com/apache/spark/pull/24208, where I aimed to enable
>>>>>> blacklisting for fetch failure by default. From the comments, there is
>>>>>> interest in the community to enable overall blacklisting feature by
>>>>>> default. I have listed down 3 different things that we can do and would
>>>>>> like to gather feedback and see if anyone has objections with regards to
>>>>>> this. Otherwise, I will just create a PR for the same.
>>>>>>
>>>>>> 1. *Enable blacklisting feature by default*. The blacklisting
>>>>>> feature was added as part of SPARK-8425 and is available since 2.2.0. This
>>>>>> feature was deemed experimental and was disabled by default. The feature
>>>>>> blacklists an executor/node from running a particular task, any task in a
>>>>>> particular stage or all tasks in application based on number of failures.
>>>>>> There are various configurations available which control those thresholds.
>>>>>> Additionally, the executor/node is only blacklisted for a configurable time
>>>>>> period. The idea is to enable blacklisting feature with existing defaults,
>>>>>> which are following:
>>>>>>
>>>>>>    1. spark.blacklist.task.maxTaskAttemptsPerExecutor = 1
>>>>>>    2. spark.blacklist.task.maxTaskAttemptsPerNode = 2
>>>>>>    3. spark.blacklist.stage.maxFailedTasksPerExecutor = 2
>>>>>>    4. spark.blacklist.stage.maxFailedExecutorsPerNode = 2
>>>>>>    5. spark.blacklist.application.maxFailedTasksPerExecutor = 2
>>>>>>    6. spark.blacklist.application.maxFailedExecutorsPerNode = 2
>>>>>>    7. spark.blacklist.timeout = 1 hour
>>>>>>
>>>>>> 2. *Kill blacklisted executors/nodes by default*. This feature was
>>>>>> added as part of SPARK-16554 and is available since 2.2.0. This is a
>>>>>> follow-on feature to blacklisting, such that if an executor/node is
>>>>>> blacklisted for the application, then it also terminates all running tasks
>>>>>> on that executor for faster failure recovery.
>>>>>>
>>>>>> 3. *Remove legacy blacklisting timeout config*
>>>>>> : spark.scheduler.executorTaskBlacklistTime
>>>>>>
>>>>>> Thanks,
>>>>>> Ankur
>>>>>>
>>>>>
>>>>>

Re: [DISCUSS] Enable blacklisting feature by default in 3.0

Posted by Chris Stevens <ch...@databricks.com>.
Hey Ankur,

I think the significant decrease in "spark.blacklist.timeout" (1 hr down to
5 minutes) in your updated suggestion is the key here.

Looking at a few *successful* runs of the application I was debugging, here
are the error rates when I did *not* have blacklisting enabled:

Run A: 8 executors with 36 total errors over the last 25 minutes of a 1
hour and 6 minute run.
Run B: 8 executors with 50 total errors over the last 30 minutes of a 1
hour run.

Increasing "spark.blacklist.application.maxFailedTasksPerExecutor" to 5
would have allowed run A (~3 failures/executor) to pass, but run B (~6
failures/executor) would not have without the change to
"spark.blacklist.timeout".

With such a small timeout of 5 minutes, the worst you get is executors
flipping between blacklisted and not blacklisted (e.g. fail 5 tasks quickly
due to disk failures, wait 5 minutes, fail 5 tasks quickly, wait 5
minutes). For catastrophic errors, this is probably OK. The executor will
fail fast each time it comes back online and will effectively be
blacklisted 90+% of the time. For transient errors, the executor will come
back online and probably be fine. The only trouble you get into is if you
run out of executors for a stage due to a high amount of transient errors,
but you're right, perhaps that many transient errors is something worth
failing for.

In the case I was debugging with fetch failures, only the 5 minute timeout
applies, but I don't think it would have mattered. Fetch task attempts were
"hanging" for 30+ minutes without failing (it took that long for the netty
channel to timeout). As such, there was no opportunity to blacklist. Even
reducing the number of fetch retry attempts didn't help, as the first
attempt occasionally stalled due to the underlying networking issues.

A few thoughts:
- Correct me if I'm wrong, but once a task fails on an executor, even if
maxTaskAttemptsPerExecutor > 1, that executor will get a failed task count
against it. It looks like "TaskSetBlacklist.updateBlacklistForFailedTask"
only adds to the executor failures. If the tasks recovers on the second
attempt on the same executor, there is no way to remove the failure. I'd
argue that if the task succeeds on a second attempt on the same executor,
then it is definitely transient and the first attempt's failure should not
count towards the executor's total stage/application failure count.
- Rather than a fixed timeout, could we do some sort of exponential
backoff? Start with a 10 or 20 second blacklist and increase from there?
The nodes with catastrophic errors should quickly hit long blacklist
intervals.
- W.r.t turning it on by default: Do we have a sense of how many teams are
using blacklisting today using the current default settings? It may be
worth changing the defaults for a release or two and gather feedback to
help make a call on turning it on by default. We could potentially get that
feedback now: two question survey "Have you enabled blacklisting?" and
"What settings did you use?"

-Chris

On Mon, Apr 1, 2019 at 9:05 AM Ankur Gupta <an...@cloudera.com> wrote:

> Hi Chris,
>
> Thanks for sending over the example. As far as I can understand, it seems
> that this would not have been a problem if
> "spark.blacklist.application.maxFailedTasksPerExecutor" was set to a higher
> threshold, as mentioned in my previous email.
>
> Though, with 8/7 executors and 2 failedTasksPerExecutor, if the
> application runs out of executors, that would imply at least 14 task
> failures in a short period of time. So, I am not sure if the application
> should still continue to run or fail. If this was not a transient issue,
> maybe failing was the correct outcome, as it saves lot of unnecessary
> computation and also alerts admins to look for transient/permanent hardware
> failures.
>
> Please let me know if you think, we should enable blacklisting feature by
> default with the higher threshold.
>
> Thanks,
> Ankur
>
> On Fri, Mar 29, 2019 at 3:23 PM Chris Stevens <
> chris.stevens@databricks.com> wrote:
>
>> Hey All,
>>
>> My initial reply got lost, because I wasn't on the dev list. Hopefully
>> this goes through.
>>
>> Back story for my experiments: customer was hitting network errors due to
>> cloud infrastructure problems. Basically, executor X couldn't fetch from Y.
>> The NIC backing the VM for executor Y was swallowing packets. I wanted to
>> blacklist node Y.
>>
>> What I learned:
>>
>> 1. `spark.blacklist.application.fetchFailure.enabled` requires
>> `spark.blacklist.enabled` to also be enabled (BlacklistTracker isn't
>> created
>> <https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L948> without
>> the latter). This was a problem because the defaults for
>> `spark.blacklist.[task|stage|application].*` are aggressive and don't even
>> apply to fetch failures. Those are always treated as non-transient. It
>> would be nice to have fetch blacklisting without regular blacklisting.
>>
>> 2. Due to the conf coupling in #1 and transient cloud storage errors in
>> the job (FileScanRDD was failing due to corrupted files), I had to set the
>> `max*PerExecutor` and `max*PerNode` to really high values (i.e. 1000).
>> Without these high settings, the customer was running out of nodes on the
>> cluster (as we don't have blacklisting enabled by default, we haven't
>> hooked it up to any sort of dynamic cloud VM re-provisioning - something
>> like `killBlacklistedNodes`). Why? The same transient FileScanRDD failure
>> hit over multiple stages, so even though executors were aggressively
>> removed within one
>> stage, `spark.blacklist.application.maxFailedTasksPerExecutor = 2` was
>> reached. The stages were succeeding because the FileScanRDD attempts on
>> other executors succeeded. As such, the 8 node cluster ran out of executors
>> after 3 stages. I did not have `spark.blacklist.killBlacklistedExecutors`.
>> If I did, then `spark.blacklist.application.maxFailedExecutorsPerNode`
>> would have kicked in and the job might have failed after 4-6 stages,
>> depending on how it played out. (FWIW, this was running one executor per
>> node).
>>
>> -Chris
>>
>> On Fri, Mar 29, 2019 at 1:48 PM Ankur Gupta <an...@cloudera.com>
>> wrote:
>>
>>> Thanks Reynold! That is certainly useful to know.
>>>
>>> @Chris Will it be possible for you to send out those details if you
>>> still have them or better create a JIRA, so someone can work on those
>>> improvements. If there is already a JIRA, can you please provide a link to
>>> the same.
>>>
>>> Additionally, if the concern is with the aggressiveness of the
>>> blacklisting, then we can enable blacklisting feature by default with
>>> higher thresholds for failures. Below is an alternate set of defaults that
>>> were also proposed in the design document for max cluster utilization:
>>>
>>>    1. spark.blacklist.task.maxTaskAttemptsPerExecutor = 2
>>>    2. spark.blacklist.task.maxTaskAttemptsPerNode = 2
>>>    3. spark.blacklist.stage.maxFailedTasksPerExecutor = 5
>>>    4. spark.blacklist.stage.maxFailedExecutorsPerNode = 4
>>>    5. spark.blacklist.application.maxFailedTasksPerExecutor = 5
>>>    6. spark.blacklist.application.maxFailedExecutorsPerNode = 4
>>>    7. spark.blacklist.timeout = 5 mins
>>>
>>>
>>>
>>> On Fri, Mar 29, 2019 at 11:18 AM Reynold Xin <rx...@databricks.com>
>>> wrote:
>>>
>>>> We tried enabling blacklisting for some customers and in the cloud,
>>>> very quickly they end up having 0 executors due to various transient
>>>> errors. So unfortunately I think the current implementation is terrible for
>>>> cloud deployments, and shouldn't be on by default. The heart of the issue
>>>> is that the current implementation is not great at dealing with transient
>>>> errors vs catastrophic errors.
>>>>
>>>> +Chris who was involved with those tests.
>>>>
>>>>
>>>>
>>>> On Thu, Mar 28, 2019 at 3:32 PM, Ankur Gupta <
>>>> ankur.gupta@cloudera.com.invalid> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> This is a follow-on to my PR:
>>>>> https://github.com/apache/spark/pull/24208, where I aimed to enable
>>>>> blacklisting for fetch failure by default. From the comments, there is
>>>>> interest in the community to enable overall blacklisting feature by
>>>>> default. I have listed down 3 different things that we can do and would
>>>>> like to gather feedback and see if anyone has objections with regards to
>>>>> this. Otherwise, I will just create a PR for the same.
>>>>>
>>>>> 1. *Enable blacklisting feature by default*. The blacklisting feature
>>>>> was added as part of SPARK-8425 and is available since 2.2.0. This feature
>>>>> was deemed experimental and was disabled by default. The feature blacklists
>>>>> an executor/node from running a particular task, any task in a particular
>>>>> stage or all tasks in application based on number of failures. There are
>>>>> various configurations available which control those thresholds.
>>>>> Additionally, the executor/node is only blacklisted for a configurable time
>>>>> period. The idea is to enable blacklisting feature with existing defaults,
>>>>> which are following:
>>>>>
>>>>>    1. spark.blacklist.task.maxTaskAttemptsPerExecutor = 1
>>>>>    2. spark.blacklist.task.maxTaskAttemptsPerNode = 2
>>>>>    3. spark.blacklist.stage.maxFailedTasksPerExecutor = 2
>>>>>    4. spark.blacklist.stage.maxFailedExecutorsPerNode = 2
>>>>>    5. spark.blacklist.application.maxFailedTasksPerExecutor = 2
>>>>>    6. spark.blacklist.application.maxFailedExecutorsPerNode = 2
>>>>>    7. spark.blacklist.timeout = 1 hour
>>>>>
>>>>> 2. *Kill blacklisted executors/nodes by default*. This feature was
>>>>> added as part of SPARK-16554 and is available since 2.2.0. This is a
>>>>> follow-on feature to blacklisting, such that if an executor/node is
>>>>> blacklisted for the application, then it also terminates all running tasks
>>>>> on that executor for faster failure recovery.
>>>>>
>>>>> 3. *Remove legacy blacklisting timeout config*
>>>>> : spark.scheduler.executorTaskBlacklistTime
>>>>>
>>>>> Thanks,
>>>>> Ankur
>>>>>
>>>>
>>>>