You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by mateiz <gi...@git.apache.org> on 2014/02/24 08:51:10 UTC

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

GitHub user mateiz opened a pull request:

    https://github.com/apache/incubator-spark/pull/641

    SPARK-1124: Fix infinite retries of reduce stage when a map stage failed

    In the previous code, if you had a failing map stage and then tried to run reduce stages on it repeatedly, the first reduce stage would fail correctly, but the later ones would mistakenly believe that all map outputs are available and start failing infinitely with fetch failures from "null". See https://spark-project.atlassian.net/browse/SPARK-1124 for an example.
    
    This PR also cleans up code style slightly where there was a variable named "s" and some weird map manipulation.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mateiz/incubator-spark spark-1124-master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-spark/pull/641.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #641
    
----
commit cd32d5e4dee1291e4509e5965322b7ffe620b1f3
Author: Matei Zaharia <ma...@databricks.com>
Date:   2014-02-24T07:45:48Z

    SPARK-1124: Fix infinite retries of reduce stage when a map stage failed
    
    In the previous code, if you had a failing map stage and then tried to
    run reduce stages on it repeatedly, the first reduce stage would fail
    correctly, but the later ones would mistakenly believe that all map
    outputs are available and start failing infinitely with fetch failures
    from "null".

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. To do so, please top-post your response.
If your project does not have this feature enabled and wishes so, or if the
feature is enabled but not working, please contact infrastructure at
infrastructure@apache.org or file a JIRA ticket with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/incubator-spark/pull/641#issuecomment-35939005
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/incubator-spark/pull/641#issuecomment-35946250
  
    Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/incubator-spark/pull/641#issuecomment-35865525
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12826/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. To do so, please top-post your response.
If your project does not have this feature enabled and wishes so, or if the
feature is enabled but not working, please contact infrastructure at
infrastructure@apache.org or file a JIRA ticket with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/incubator-spark/pull/641#issuecomment-35864121
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. To do so, please top-post your response.
If your project does not have this feature enabled and wishes so, or if the
feature is enabled but not working, please contact infrastructure at
infrastructure@apache.org or file a JIRA ticket with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/641#discussion_r10012275
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -373,25 +375,26 @@ class DAGScheduler(
               } else {
                 def removeStage(stageId: Int) {
                   // data structures based on Stage
    -              stageIdToStage.get(stageId).foreach { s =>
    -                if (running.contains(s)) {
    +              for (stage <- stageIdToStage.get(stageId)) {
    +                if (running.contains(stage)) {
                       logDebug("Removing running stage %d".format(stageId))
    -                  running -= s
    +                  running -= stage
    +                }
    +                stageToInfos -= stage
    +                for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) {
    --- End diff --
    
    No big deal either way.  It's just that when I see a bound pattern I expect it to be used; whereas if the pattern is only partially bound, then I know to expect that parts of the complete pattern won't be used.  In this case, the relevant code is short and obvious enough that there's not much difference.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/641#discussion_r10011948
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -373,25 +375,26 @@ class DAGScheduler(
               } else {
                 def removeStage(stageId: Int) {
                   // data structures based on Stage
    -              stageIdToStage.get(stageId).foreach { s =>
    -                if (running.contains(s)) {
    +              for (stage <- stageIdToStage.get(stageId)) {
    +                if (running.contains(stage)) {
                       logDebug("Removing running stage %d".format(stageId))
    -                  running -= s
    +                  running -= stage
    +                }
    +                stageToInfos -= stage
    +                for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) {
    --- End diff --
    
    Haha, I wanted to avoid lots of underscores, but I can do it if you think it's more idiomatic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/641#discussion_r9997152
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -373,25 +375,26 @@ class DAGScheduler(
               } else {
                 def removeStage(stageId: Int) {
                   // data structures based on Stage
    -              stageIdToStage.get(stageId).foreach { s =>
    -                if (running.contains(s)) {
    +              for (stage <- stageIdToStage.get(stageId)) {
    +                if (running.contains(stage)) {
                       logDebug("Removing running stage %d".format(stageId))
    -                  running -= s
    +                  running -= stage
    +                }
    +                stageToInfos -= stage
    +                for (shuffleDep <- stage.shuffleDep) {
    --- End diff --
    
    At this point, the need is to clean up the DAGScheduler's data structures that reference the removed stage.  If you are 100% certain that the stage's notion of shuffleDeps contains every shuffleId that shuffleToMapStage is tracking for the stage, then you can take the simpler route that you have of working from the stage's understanding.  I wasn't 100% certain of that (which is not the same thing as saying that I have a good reason to believe that the stage's understanding of shuffleDeps will diverge from shuffleToMapStage's understanding), so I took the safer route of working from what shuffleToMapStage's knows instead of from what the stage knows.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/641#discussion_r9996518
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -373,25 +375,26 @@ class DAGScheduler(
               } else {
                 def removeStage(stageId: Int) {
                   // data structures based on Stage
    -              stageIdToStage.get(stageId).foreach { s =>
    -                if (running.contains(s)) {
    +              for (stage <- stageIdToStage.get(stageId)) {
    +                if (running.contains(stage)) {
                       logDebug("Removing running stage %d".format(stageId))
    -                  running -= s
    +                  running -= stage
    +                }
    +                stageToInfos -= stage
    +                for (shuffleDep <- stage.shuffleDep) {
    +                  shuffleToMapStage.remove(shuffleDep.shuffleId)
                     }
    -                stageToInfos -= s
    -                shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId =>
    -                  shuffleToMapStage.remove(shuffleId))
    -                if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
    +                if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) {
                       logDebug("Removing pending status for stage %d".format(stageId))
                     }
    -                pendingTasks -= s
    --- End diff --
    
    Doing so would introduce both a bug and a logging error.  Removing an empty Set from pendingTasks is normal, expected and desired behavior at this point.  And only unexpected clean-up is intended to be logged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/641#discussion_r10006949
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -272,8 +272,10 @@ class DAGScheduler(
         if (mapOutputTracker.has(shuffleDep.shuffleId)) {
           val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
           val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
    -      for (i <- 0 until locs.size) stage.outputLocs(i) = List(locs(i))
    -      stage.numAvailableOutputs = locs.size
    +      for (i <- 0 until locs.size) {
    +        stage.outputLocs(i) = Option(locs(i)).toList   // locs(i) will be null if missing
    --- End diff --
    
    Yup, as long as the underlying data structure is an array of length numTasks, null is as good as any other "missing output" flag.  Changing to a Map containing only valid locations instead of an array might make sense if the need to get the output statuses grows beyond the present cases in MapOutputTracker and DAGScheduler (and if the performance difference is acceptable), but now that we're handling the nulls properly in those existing cases, we're good at least for now. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/641#discussion_r10000699
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -373,25 +375,26 @@ class DAGScheduler(
               } else {
                 def removeStage(stageId: Int) {
                   // data structures based on Stage
    -              stageIdToStage.get(stageId).foreach { s =>
    -                if (running.contains(s)) {
    +              for (stage <- stageIdToStage.get(stageId)) {
    +                if (running.contains(stage)) {
                       logDebug("Removing running stage %d".format(stageId))
    -                  running -= s
    +                  running -= stage
    +                }
    +                stageToInfos -= stage
    +                for (shuffleDep <- stage.shuffleDep) {
    +                  shuffleToMapStage.remove(shuffleDep.shuffleId)
                     }
    -                stageToInfos -= s
    -                shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId =>
    -                  shuffleToMapStage.remove(shuffleId))
    -                if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
    +                if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) {
                       logDebug("Removing pending status for stage %d".format(stageId))
                     }
    -                pendingTasks -= s
    --- End diff --
    
    I should elaborate - I made the assumption that the change was obvious.
    
    if (pendingTasks.contains(stage)) {
      if (!pendingTasks(stage).isEmpty) logDebug ...
       pendingTasks -= stage
    }
    
    Update pendingTasks only if it is known to contain the stage.
    I am not sure how common it is for pendingTasks not to have the stage - but I would assume it is fairly common : so avoid unnecessary re-search


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/incubator-spark/pull/641


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/incubator-spark/pull/641#issuecomment-35963256
  
    Merged into master and branch-0.9.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/incubator-spark/pull/641#issuecomment-35864120
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. To do so, please top-post your response.
If your project does not have this feature enabled and wishes so, or if the
feature is enabled but not working, please contact infrastructure at
infrastructure@apache.org or file a JIRA ticket with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/641#discussion_r10000592
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -373,25 +375,26 @@ class DAGScheduler(
               } else {
                 def removeStage(stageId: Int) {
                   // data structures based on Stage
    -              stageIdToStage.get(stageId).foreach { s =>
    -                if (running.contains(s)) {
    +              for (stage <- stageIdToStage.get(stageId)) {
    +                if (running.contains(stage)) {
                       logDebug("Removing running stage %d".format(stageId))
    -                  running -= s
    +                  running -= stage
    +                }
    +                stageToInfos -= stage
    +                for (shuffleDep <- stage.shuffleDep) {
    +                  shuffleToMapStage.remove(shuffleDep.shuffleId)
                     }
    -                stageToInfos -= s
    -                shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId =>
    -                  shuffleToMapStage.remove(shuffleId))
    -                if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
    +                if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) {
                       logDebug("Removing pending status for stage %d".format(stageId))
                     }
    -                pendingTasks -= s
    --- End diff --
    
    I am not sure if we are talking about the same thing here ...
    I wanted 'pendingTasks -= stage' to be moved into the if() condition.
    I dont see how it is a behavior change or a bug


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/641#discussion_r10000819
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -373,25 +375,26 @@ class DAGScheduler(
               } else {
                 def removeStage(stageId: Int) {
                   // data structures based on Stage
    -              stageIdToStage.get(stageId).foreach { s =>
    -                if (running.contains(s)) {
    +              for (stage <- stageIdToStage.get(stageId)) {
    +                if (running.contains(stage)) {
                       logDebug("Removing running stage %d".format(stageId))
    -                  running -= s
    +                  running -= stage
    +                }
    +                stageToInfos -= stage
    +                for (shuffleDep <- stage.shuffleDep) {
    +                  shuffleToMapStage.remove(shuffleDep.shuffleId)
                     }
    -                stageToInfos -= s
    -                shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId =>
    -                  shuffleToMapStage.remove(shuffleId))
    -                if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
    +                if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) {
                       logDebug("Removing pending status for stage %d".format(stageId))
                     }
    -                pendingTasks -= s
    --- End diff --
    
    That works, but splitting the conditional is probably less performant than asking the Set to remove an element that it doesn't contain.  Either way, it's a trivial difference.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/641#discussion_r9996859
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -272,8 +272,10 @@ class DAGScheduler(
         if (mapOutputTracker.has(shuffleDep.shuffleId)) {
           val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
           val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
    -      for (i <- 0 until locs.size) stage.outputLocs(i) = List(locs(i))
    -      stage.numAvailableOutputs = locs.size
    +      for (i <- 0 until locs.size) {
    +        stage.outputLocs(i) = Option(locs(i)).toList   // locs(i) will be null if missing
    --- End diff --
    
    Ok, it was a bad assumption on my part that the mapOutputTracker would only be returning valid locations, and this PR does address the possibility of null values here.  However, it feels to me like it is addressing the symptom instead of the cause.  Wouldn't we be better off either not putting nulls into the MapOutputTracker in the first place, or at least not returning them when getting map output statuses?  As it stands, null checks are probably needed everywhere the output statuses are fetched.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/641#discussion_r10001290
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -373,25 +375,26 @@ class DAGScheduler(
               } else {
                 def removeStage(stageId: Int) {
                   // data structures based on Stage
    -              stageIdToStage.get(stageId).foreach { s =>
    -                if (running.contains(s)) {
    +              for (stage <- stageIdToStage.get(stageId)) {
    +                if (running.contains(stage)) {
                       logDebug("Removing running stage %d".format(stageId))
    -                  running -= s
    +                  running -= stage
    +                }
    +                stageToInfos -= stage
    +                for (shuffleDep <- stage.shuffleDep) {
    +                  shuffleToMapStage.remove(shuffleDep.shuffleId)
                     }
    -                stageToInfos -= s
    -                shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId =>
    -                  shuffleToMapStage.remove(shuffleId))
    -                if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
    +                if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) {
                       logDebug("Removing pending status for stage %d".format(stageId))
                     }
    -                pendingTasks -= s
    --- End diff --
    
    On second thoughts, this is a fairly infrequent code path - not the common path I assumed it was - so ignore the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/incubator-spark/pull/641#issuecomment-35885498
  
    Looks good, nice catch !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. To do so, please top-post your response.
If your project does not have this feature enabled and wishes so, or if the
feature is enabled but not working, please contact infrastructure at
infrastructure@apache.org or file a JIRA ticket with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/incubator-spark/pull/641#issuecomment-35865523
  
    Merged build finished.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. To do so, please top-post your response.
If your project does not have this feature enabled and wishes so, or if the
feature is enabled but not working, please contact infrastructure at
infrastructure@apache.org or file a JIRA ticket with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/641#discussion_r10000899
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -373,25 +375,26 @@ class DAGScheduler(
               } else {
                 def removeStage(stageId: Int) {
                   // data structures based on Stage
    -              stageIdToStage.get(stageId).foreach { s =>
    -                if (running.contains(s)) {
    +              for (stage <- stageIdToStage.get(stageId)) {
    +                if (running.contains(stage)) {
                       logDebug("Removing running stage %d".format(stageId))
    -                  running -= s
    +                  running -= stage
    +                }
    +                stageToInfos -= stage
    +                for (shuffleDep <- stage.shuffleDep) {
    +                  shuffleToMapStage.remove(shuffleDep.shuffleId)
                     }
    -                stageToInfos -= s
    -                shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId =>
    -                  shuffleToMapStage.remove(shuffleId))
    -                if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
    +                if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) {
                       logDebug("Removing pending status for stage %d".format(stageId))
                     }
    -                pendingTasks -= s
    --- End diff --
    
    That is surprising !
    I would assume searching for an entry in a Set is atleast a few dozen opcodes, if not more - even if already searched (and so bpt's are favourable); would still be much more expensive than splitting an if for sure - I have not looked into scala's impl : but assuming reasonable implementation of 'if', this is definitely unexpected :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/incubator-spark/pull/641#issuecomment-35939003
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/641#discussion_r10004187
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -373,25 +375,26 @@ class DAGScheduler(
               } else {
                 def removeStage(stageId: Int) {
                   // data structures based on Stage
    -              stageIdToStage.get(stageId).foreach { s =>
    -                if (running.contains(s)) {
    +              for (stage <- stageIdToStage.get(stageId)) {
    +                if (running.contains(stage)) {
                       logDebug("Removing running stage %d".format(stageId))
    -                  running -= s
    +                  running -= stage
    +                }
    +                stageToInfos -= stage
    +                for (shuffleDep <- stage.shuffleDep) {
    --- End diff --
    
    Good point, maybe I'll change this back. It seemed that each stage is added using its own shuffleID, but perhaps with failures we will sometimes replace it with a newer stage and then we'd be removing that one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/641#discussion_r10000990
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -373,25 +375,26 @@ class DAGScheduler(
               } else {
                 def removeStage(stageId: Int) {
                   // data structures based on Stage
    -              stageIdToStage.get(stageId).foreach { s =>
    -                if (running.contains(s)) {
    +              for (stage <- stageIdToStage.get(stageId)) {
    +                if (running.contains(stage)) {
                       logDebug("Removing running stage %d".format(stageId))
    -                  running -= s
    +                  running -= stage
    +                }
    +                stageToInfos -= stage
    +                for (shuffleDep <- stage.shuffleDep) {
    +                  shuffleToMapStage.remove(shuffleDep.shuffleId)
                     }
    -                stageToInfos -= s
    -                shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId =>
    -                  shuffleToMapStage.remove(shuffleId))
    -                if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
    +                if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) {
                       logDebug("Removing pending status for stage %d".format(stageId))
                     }
    -                pendingTasks -= s
    --- End diff --
    
    To add, this was just a suggestion given the code was already changing there; and was more readable.
    Punting on it is perfectly fine - not a big deal


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/641#discussion_r9990785
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -373,25 +375,26 @@ class DAGScheduler(
               } else {
                 def removeStage(stageId: Int) {
                   // data structures based on Stage
    -              stageIdToStage.get(stageId).foreach { s =>
    -                if (running.contains(s)) {
    +              for (stage <- stageIdToStage.get(stageId)) {
    +                if (running.contains(stage)) {
                       logDebug("Removing running stage %d".format(stageId))
    -                  running -= s
    +                  running -= stage
    +                }
    +                stageToInfos -= stage
    +                for (shuffleDep <- stage.shuffleDep) {
    +                  shuffleToMapStage.remove(shuffleDep.shuffleId)
                     }
    -                stageToInfos -= s
    -                shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId =>
    -                  shuffleToMapStage.remove(shuffleId))
    -                if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
    +                if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) {
                       logDebug("Removing pending status for stage %d".format(stageId))
                     }
    -                pendingTasks -= s
    --- End diff --
    
    Unrelated, but we could move this into the condition above ... the checks are done anyway !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/incubator-spark/pull/641#issuecomment-35939422
  
    Okay, I think I've fixed the shuffleToMapStage removal part: https://github.com/mateiz/incubator-spark/commit/0187cef0f284e6cb22cb3986c327c43304daf57d.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/641#discussion_r10003953
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -272,8 +272,10 @@ class DAGScheduler(
         if (mapOutputTracker.has(shuffleDep.shuffleId)) {
           val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
           val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
    -      for (i <- 0 until locs.size) stage.outputLocs(i) = List(locs(i))
    -      stage.numAvailableOutputs = locs.size
    +      for (i <- 0 until locs.size) {
    +        stage.outputLocs(i) = Option(locs(i)).toList   // locs(i) will be null if missing
    --- End diff --
    
    Yeah, the problem is that the MapOutputTracker returns an array of MapStatus, where position i of the array is supposed to be the location of the output of task i. This array contains nulls initially to denote a missing location, because a MapStatus can only indicate a real location. So I think that either way we'll have to check for some "missing output" condition.
    
    With this fix I believe we won't be launching reduce stages when some maps are missing anymore, so we should be okay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/641#discussion_r10000970
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -373,25 +375,26 @@ class DAGScheduler(
               } else {
                 def removeStage(stageId: Int) {
                   // data structures based on Stage
    -              stageIdToStage.get(stageId).foreach { s =>
    -                if (running.contains(s)) {
    +              for (stage <- stageIdToStage.get(stageId)) {
    +                if (running.contains(stage)) {
                       logDebug("Removing running stage %d".format(stageId))
    -                  running -= s
    +                  running -= stage
    +                }
    +                stageToInfos -= stage
    +                for (shuffleDep <- stage.shuffleDep) {
    +                  shuffleToMapStage.remove(shuffleDep.shuffleId)
                     }
    -                stageToInfos -= s
    -                shuffleToMapStage.keys.filter(shuffleToMapStage(_) == s).foreach(shuffleId =>
    -                  shuffleToMapStage.remove(shuffleId))
    -                if (pendingTasks.contains(s) && !pendingTasks(s).isEmpty) {
    +                if (pendingTasks.contains(stage) && !pendingTasks(stage).isEmpty) {
                       logDebug("Removing pending status for stage %d".format(stageId))
                     }
    -                pendingTasks -= s
    --- End diff --
    
    And the expected, common case when a stage is being removed is for pendingTasks to have an empty Set of tasks associated with that stage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/incubator-spark/pull/641#issuecomment-35946252
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12832/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/641#discussion_r10012382
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -373,25 +375,26 @@ class DAGScheduler(
               } else {
                 def removeStage(stageId: Int) {
                   // data structures based on Stage
    -              stageIdToStage.get(stageId).foreach { s =>
    -                if (running.contains(s)) {
    +              for (stage <- stageIdToStage.get(stageId)) {
    +                if (running.contains(stage)) {
                       logDebug("Removing running stage %d".format(stageId))
    -                  running -= s
    +                  running -= stage
    +                }
    +                stageToInfos -= stage
    +                for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) {
    --- End diff --
    
    Alright, I'll probably leave it as is then, but thanks for taking a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] incubator-spark pull request: SPARK-1124: Fix infinite retries of ...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/incubator-spark/pull/641#discussion_r10011413
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -373,25 +375,26 @@ class DAGScheduler(
               } else {
                 def removeStage(stageId: Int) {
                   // data structures based on Stage
    -              stageIdToStage.get(stageId).foreach { s =>
    -                if (running.contains(s)) {
    +              for (stage <- stageIdToStage.get(stageId)) {
    +                if (running.contains(stage)) {
                       logDebug("Removing running stage %d".format(stageId))
    -                  running -= s
    +                  running -= stage
    +                }
    +                stageToInfos -= stage
    +                for ((k, v) <- shuffleToMapStage.find(_._2 == stage)) {
    --- End diff --
    
    nit: for((k, _) <- ...)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---