You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Imran Rashid (JIRA)" <ji...@apache.org> on 2016/10/13 15:45:20 UTC

[jira] [Comment Edited] (SPARK-17911) Scheduler does not need messageScheduler for ResubmitFailedStages

    [ https://issues.apache.org/jira/browse/SPARK-17911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15572289#comment-15572289 ] 

Imran Rashid edited comment on SPARK-17911 at 10/13/16 3:44 PM:
----------------------------------------------------------------

Copying the earlier discussion on the PR here

from squito:
bq.I find myself frequently wondering about the purpose of this. Its commented very tersely on RESUBMIT_TIMEOUT, but I think it might be nice to add a longer comment here. I guess something like
bq. "If we get one fetch-failure, we often get more fetch failures across multiple executors. We will get better parallelism when we resubmit the mapStage if we can resubmit when we know about as many of those failures as possible. So this is a heuristic to add a small delay to see if we gather a few more failures before we resubmit."
bq.We do not need the delay to figure out exactly which shuffle-map outputs are gone on the executor -- we always mark the executor as lost on a fetch failure, which means we mark all its map output as gone. (This is really confusing -- it looks like we only remove the one shuffle-map output that was involved in the fetch failure, but then the entire removal is buried inside another method a few lines further.)
bq.I did some browsing through history, and there used to be this comment
{noformat}
     // Periodically resubmit failed stages if some map output fetches have failed and we have
     // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
     // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
     // the same time, so we want to make sure we've identified all the reduce tasks that depend
     // on the failed node.
{noformat}
bq. at least in the current version, this also sounds like a bad reason to have the delay. failedStage won't be resubmitted till mapStage completes anyway, and then it'll look to see what tasks it is missing. Adding a tiny delay on top of the natural delay for mapStage seems pretty pointless.
bq. I don't even think that the reason I gave in my suggested comment is a good one -- do you really expect failures in multiple executors? But it is at least logically consistent.

from markhamstra
bq. I don't like "Periodically" in your suggested comment, since this is a one-shot action after a delay of RESUBMIT_TIMEOUT milliseconds.
bq.I agree that this delay-before-resubmit logic is suspect. If we are both thinking correctly that a 200 ms delay on top of the time to re-run the mapStage is all but inconsequential, then removing it in this PR would be fine. If there are unanticipated consequences, though, I'd prefer to have that change in a separate PR.

from squito
bq. yeah probably a separate PR, sorry this was just an opportunity for me to rant :)
bq.And sorry if I worded it poorly, but I was not suggesting the one w/ "Periodically" as a better comment -- in fact I think its a bad comment, just wanted to mention it was another description which used to be there long ago.
bq. This was my suggestion:
bq. If we get one fetch-failure, we often get more fetch failures across multiple executors. We will get better parallelism when we resubmit the mapStage if we can resubmit when we know about as many of those failures as possible. So this is a heuristic to add a small delay to see if we gather a few more failures before we resubmit.

from markhamstra
bq. Ah, sorry for ascribing the prior comment to your preferences. That comment actually did make sense a long time ago when the resubmitting of stages really was done periodically by an Akka scheduled event that fired every something seconds. I'm pretty sure the RESUBMIT_TIMEOUT stuff is also legacy code that doesn't make sense and isn't necessary any more.
bq. So, do you want to do the follow-up PR to get rid of it, or shall I?
bq. BTW, nothing wrong with your wording -- but my poor reading can create misunderstanding of even the clearest text.
from squito
bq. if you are willing, could you please file the follow up? I am bouncing between various things in my backlog -- though that change is small, I have a feeling it will be merit extra discussion as a risky change, would be great if you drive it
from markhamstra
bq. Ok, I can get started on that. I believe that leaves this PR ready to merge.
from markhamstra
bq. I think that I am going to backtrack on creating a new PR, because I think that the RESUBMIT_TIMEOUT actually does still make sense.
bq. If we go way back in DAGScheduler history (https://github.com/apache/spark/blob/branch-0.8/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala) we'll find that we had an event queue that was polled every 10 millis (POLL_TIMEOUT) and that fetch failures didn't produce separate resubmit tasks events, but rather we called resubmitFailedStages within the event polling/handling loop if 50 millis (RESUBMIT_TIMEOUT) had passed since the last time a FetchFailed was received in a CompletionEvent. The handling of a FetchFailure didn't have any check of whether a failed stage was already in failedStages or any other kind of heuristic as to whether a resubmit was already scheduled for a failed stage, so the RESUBMIT_TIMEOUT was effectively the only thing preventing multiple resubmits from occurring as quickly as every 10 ms.
bq. RESUBMIT_TIMEOUT has persisted through the multiple iterations of the DAGScheduler since then, and now it is effectively making sure that the time between ResubmitFailedStages events is at least 200 ms. The time to actually complete any of the stages in failedStages when the ResubmitFailedStages event is handled doesn't really come into play, since failedStages is cleared within resubmitFailedStages and that method returns as soon as the stages that were queued up are resubmitted, not actually done re-calculating. In other words, handling a ResubmitFailedStages event should be quick, and causes failedStages to be cleared, allowing the next ResubmitFailedStages event to be posted from the handling of another FetchFailed. If there are the expected lot of fetch failures for a single stage, and there is no RESUBMIT_TIMEOUT, then it is quite likely that there will be a burst of resubmit events (and corresponding log messages) and submitStage calls made in rapid succession.
bq. I think I'm inclined to keep the RESUBMIT_TIMEOUT throttle.



was (Author: irashid):
Copying the earlier discussion on the PR here

from squito:
bq.I find myself frequently wondering about the purpose of this. Its commented very tersely on RESUBMIT_TIMEOUT, but I think it might be nice to add a longer comment here. I guess something like
bq. "If we get one fetch-failure, we often get more fetch failures across multiple executors. We will get better parallelism when we resubmit the mapStage if we can resubmit when we know about as many of those failures as possible. So this is a heuristic to add a small delay to see if we gather a few more failures before we resubmit."
bq.We do not need the delay to figure out exactly which shuffle-map outputs are gone on the executor -- we always mark the executor as lost on a fetch failure, which means we mark all its map output as gone. (This is really confusing -- it looks like we only remove the one shuffle-map output that was involved in the fetch failure, but then the entire removal is buried inside another method a few lines further.)
bq.I did some browsing through history, and there used to be this comment
{noformat}
     // Periodically resubmit failed stages if some map output fetches have failed and we have
     // waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
     // tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
     // the same time, so we want to make sure we've identified all the reduce tasks that depend
     // on the failed node.
{noformat}
bq. at least in the current version, this also sounds like a bad reason to have the delay. failedStage won't be resubmitted till mapStage completes anyway, and then it'll look to see what tasks it is missing. Adding a tiny delay on top of the natural delay for mapStage seems pretty pointless.
bq. I don't even think that the reason I gave in my suggested comment is a good one -- do you really expect failures in multiple executors? But it is at least logically consistent.

from markhamstra
bq. I don't like "Periodically" in your suggested comment, since this is a one-shot action after a delay of RESUBMIT_TIMEOUT milliseconds.
bq.I agree that this delay-before-resubmit logic is suspect. If we are both thinking correctly that a 200 ms delay on top of the time to re-run the mapStage is all but inconsequential, then removing it in this PR would be fine. If there are unanticipated consequences, though, I'd prefer to have that change in a separate PR.

from squito
bq. yeah probably a separate PR, sorry this was just an opportunity for me to rant :)
bq.And sorry if I worded it poorly, but I was not suggesting the one w/ "Periodically" as a better comment -- in fact I think its a bad comment, just wanted to mention it was another description which used to be there long ago.
bq. This was my suggestion:
bq. If we get one fetch-failure, we often get more fetch failures across multiple executors. We will get better parallelism when we resubmit the mapStage if we can resubmit when we know about as many of those failures as possible. So this is a heuristic to add a small delay to see if we gather a few more failures before we resubmit.

from markhamstra
bq. Ah, sorry for ascribing the prior comment to your preferences. That comment actually did make sense a long time ago when the resubmitting of stages really was done periodically by an Akka scheduled event that fired every something seconds. I'm pretty sure the RESUBMIT_TIMEOUT stuff is also legacy code that doesn't make sense and isn't necessary any more.
bq. So, do you want to do the follow-up PR to get rid of it, or shall I?
bq. BTW, nothing wrong with your wording -- but my poor reading can create misunderstanding of even the clearest text.
from squito
bq. if you are willing, could you please file the follow up? I am bouncing between various things in my backlog -- though that change is small, I have a feeling it will be merit extra discussion as a risky change, would be great if you drive it
bq. Ok, I can get started on that. I believe that leaves this PR ready to merge.
bq. I think that I am going to backtrack on creating a new PR, because I think that the RESUBMIT_TIMEOUT actually does still make sense.
bq. If we go way back in DAGScheduler history (https://github.com/apache/spark/blob/branch-0.8/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala) we'll find that we had an event queue that was polled every 10 millis (POLL_TIMEOUT) and that fetch failures didn't produce separate resubmit tasks events, but rather we called resubmitFailedStages within the event polling/handling loop if 50 millis (RESUBMIT_TIMEOUT) had passed since the last time a FetchFailed was received in a CompletionEvent. The handling of a FetchFailure didn't have any check of whether a failed stage was already in failedStages or any other kind of heuristic as to whether a resubmit was already scheduled for a failed stage, so the RESUBMIT_TIMEOUT was effectively the only thing preventing multiple resubmits from occurring as quickly as every 10 ms.
bq. RESUBMIT_TIMEOUT has persisted through the multiple iterations of the DAGScheduler since then, and now it is effectively making sure that the time between ResubmitFailedStages events is at least 200 ms. The time to actually complete any of the stages in failedStages when the ResubmitFailedStages event is handled doesn't really come into play, since failedStages is cleared within resubmitFailedStages and that method returns as soon as the stages that were queued up are resubmitted, not actually done re-calculating. In other words, handling a ResubmitFailedStages event should be quick, and causes failedStages to be cleared, allowing the next ResubmitFailedStages event to be posted from the handling of another FetchFailed. If there are the expected lot of fetch failures for a single stage, and there is no RESUBMIT_TIMEOUT, then it is quite likely that there will be a burst of resubmit events (and corresponding log messages) and submitStage calls made in rapid succession.
bq. I think I'm inclined to keep the RESUBMIT_TIMEOUT throttle.


> Scheduler does not need messageScheduler for ResubmitFailedStages
> -----------------------------------------------------------------
>
>                 Key: SPARK-17911
>                 URL: https://issues.apache.org/jira/browse/SPARK-17911
>             Project: Spark
>          Issue Type: Improvement
>          Components: Scheduler
>    Affects Versions: 2.0.0
>            Reporter: Imran Rashid
>
> Its not totally clear what the purpose of the {{messageScheduler}} is in {{DAGScheduler}}.  It can perhaps be eliminated completely; or perhaps we should just clearly document its purpose.
> This comes from a long discussion w/ [~markhamstra] on an unrelated PR here: https://github.com/apache/spark/pull/15335/files/c80ad22a242255cac91cce2c7c537f9b21100f70#diff-6a9ff7fb74fd490a50462d45db2d5e11
> But its tricky so breaking it out here for archiving the discussion.
> Note: this issue requires a decision on what to do before a code change, so lets just discuss it on jira first.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org