You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-issues@hadoop.apache.org by "Carlo Curino (JIRA)" <ji...@apache.org> on 2013/05/23 02:05:20 UTC

[jira] [Commented] (MAPREDUCE-5269) Preemption of Reducer (and Shuffle) via checkpointing

    [ https://issues.apache.org/jira/browse/MAPREDUCE-5269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13664720#comment-13664720 ] 

Carlo Curino commented on MAPREDUCE-5269:
-----------------------------------------

This patch completes the series MAPREDUCE-5176,MAPREDUCE-5189,MAPREDUCE-5192,MAPREDUCE-5194,MAPREDUCE-5196,MAPREDUCE-5197 and provides the actual checkpoint mechanics for shuffle and reducers. In Task we propagate the requests coming in form the umbilical protocol to preempt the task and propagate them into the reducer and shuffle logic.

--- Saving the computation state to checkpoint ---
For Shuffle we leverage the plugin architecture to provide a PreemptableShuffle. The key intuition behind this code is to: 
# bring the fetcher threads to a safe halt (based on MAPREDUCE-5194), 
# take note of what are the map that have been pulled so far
# finish the local sort of such maps
# write (using MAPREDUCE-5197) a checkpoint file that contains a header with the list of map ids, and the sorted K,V pairs. This is a modified IFile.

For Reducers we check that the Reducer and OutputCommitter are tagged as @Preemptable by the user (see discussion on MAPREDUCE-5176) and:
# wait for the execution of the user UDF to complete for a key-group
# promote the output produced so far, so that is not removed during cleanup (this leverages a modified version of FileOutpuCommitter: the PartialOutputCommitter)
# write (using MAPREDUCE-5197) in a checkpoint the remaining portion of the data to be processed (same format as for shuffle, where the header contains all the map ids, and the K,V pairs store the data not reducerd yet)

--- Restarting from a checkpoint ---
For both Shuffle and Reducer checkpoints we simply:
# during the init of the PreemptableShuffle we check on whether there is a checkpoint for this task
# if one exists we initialized the list of finishedMaps in the shuffle scheduler to what has been saved in the checkpoint
# we add the IFile contained in the checkpoint to the set of files we are shuffling
# proceed with normal shuffle

--- Failures ---
We detect problems with the checkpoints during restart, and if the checkpoint is corrupted/unavailable or the partially committed output is missing
we completely reset the execution for this task by wiping the partially committed output and restarting from scratch the execution of the task (basically
we fallback to a classic task re-execution). Beside unit tests, and regular runs, we validated this recovery mechanism by injecting faults (missed propagations
of checkpoint ids, missing checkpoint files, missing output files) and observed the system behaving properly. 

(please direct conversations around when to tag a reducer as @Preemptable and how to support stateful reducers to MAPREDUCE-5176)


                
> Preemption of Reducer (and Shuffle) via checkpointing
> -----------------------------------------------------
>
>                 Key: MAPREDUCE-5269
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-5269
>             Project: Hadoop Map/Reduce
>          Issue Type: Improvement
>          Components: mrv2
>            Reporter: Carlo Curino
>         Attachments: MAPREDUCE-5269.patch
>
>
> This patch tracks the changes in the task runtime (shuffle, reducer context, etc.) that are required to implement checkpoint-based preemption of reducer tasks.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira