You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/08/15 00:28:44 UTC

[GitHub] [spark] vanzin commented on issue #24892: [SPARK-25341][Core] Support rolling back a shuffle map stage and re-generate the shuffle files

vanzin commented on issue #24892: [SPARK-25341][Core] Support rolling back a shuffle map stage and re-generate the shuffle files
URL: https://github.com/apache/spark/pull/24892#issuecomment-521468312
 
 
   So I'm trying to page in enough context about all this, but I can't shake this feeling that I'm missing something about speculative tasks in non-deterministic stages being safe.
   
   The code that triggers me is this, in `DAGScheduler.handleTaskCompletion`:
   
   ```
             case smt: ShuffleMapTask =>
               val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
               shuffleStage.pendingPartitions -= task.partitionId
               val status = event.result.asInstanceOf[MapStatus]
               val execId = status.location.executorId
               logDebug("ShuffleMapTask finished on " + execId)
               if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
                 logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
               } else {
                 // The epoch of the task is acceptable (i.e., the task was launched after the most
                 // recent failure we're aware of for the executor), so mark the task's output as
                 // available.
                 mapOutputTracker.registerMapOutput(
                   shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
               }
   ```
   
   That seems to be blindly overwriting an existing task's output with the new one.  Wouldn't that mean that a speculative task could replace the output of another task after the stage has finished (and thus after the next stage started running)? 
   
   The stage is marked as finished as soon as there are output blocks for all the partitions, and at that point there may still be speculative tasks that haven't reported back. I believe in that case the driver makes an effort to kill them, but what if that task result arrives first?
   
   (There's a check at the very next line where if checks `if  (runningStages.contains(shuffleStage)...)`, which feels like it should also be applied to the `mapOutputTracker` call I mention above.)
   
   Sorry if I'm missing something obvious. Need to spend more time to fully understand this. (I also realize that what I'm commenting on isn't necessarily caused by this particular PR or would change by the latest suggestions, but rather is an existing thing.)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org