You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Steve Loughran (JIRA)" <ji...@apache.org> on 2017/07/15 15:12:00 UTC

[jira] [Commented] (SPARK-19790) OutputCommitCoordinator should not allow another task to commit after an ExecutorFailure

    [ https://issues.apache.org/jira/browse/SPARK-19790?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16088630#comment-16088630 ] 

Steve Loughran commented on SPARK-19790:
----------------------------------------

I've now summarised the FileOutputCommitter v1 and v2 algorithms as far as I can understand from the source and some step throughts; I think I'd need to add a few more tests to be really sure I understand it: [https://github.com/steveloughran/hadoop/blob/s3guard/HADOOP-13786-committer/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3a_committer_architecture.md]

h2. v1

# task commits by atomic rename() of {{$dest/_temporary/$appAttemptId/_temporary/$taskAttemptID}}  to the completed dir {{$dest/__temporary/$jobAttempt/$taskAttempt}}  ; job commit moves all data under each task attempt into $dest. Note that as only one rename() to that dest will work, race conditions in speculative work are prevented without the need for expliicit co-ord

# failure during task commit: delete the completed task directory (if any), rerun task.
# job commit: list completed tasks, move/merge them into the dest dir. Not atomic, rename operation count is per task & dir tree,. A failure in job commit is not recoverable.
# failure during job commit: job in unknown state, rm $dest and rerun

# job restart: after failure of entire job, it can be restarted, with restarted job using the completed tasks. Provided rename() is atomic there's a guarantee that every  task's completed dir is valid; provided it's O(1) its an inexpensvie operation

h3. v2

# task commit: copy straight to dest (this is where co-ordination is needed between tasks and job manager) 
# job commit: no-op
# job restart: none, start again
# failure during task commit: dest dir in unknown state, job restart needed
# failure during job commit: job in unknown state, rm $dest and rerun

Given that spark doesn't do job restart, switching to v2 everywhere reduces #of renames, but makes recovery from failure during task commit impossible

Neither algorithm is correct on an inconsistent S3 endpoint, as they both can get incorrect listings of files to COPY + DELETE. With consistency, you still have O(data) task commits on both algorithms, and another O(data) job commit with v1. Azure WASB is consistent, and uses leases for exclusive access to bits of the store on comnmit, but even it can do with a store-specific committer. Rename is not the solution to committing data in an object store.



> OutputCommitCoordinator should not allow another task to commit after an ExecutorFailure
> ----------------------------------------------------------------------------------------
>
>                 Key: SPARK-19790
>                 URL: https://issues.apache.org/jira/browse/SPARK-19790
>             Project: Spark
>          Issue Type: Bug
>          Components: Scheduler
>    Affects Versions: 2.1.0
>            Reporter: Imran Rashid
>
> The OutputCommitCoordinator resets the allowed committer when the task fails.  https://github.com/apache/spark/blob/8aa560b75e6b083b2a890c52301414285ba35c3d/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala#L143
> However, if a task fails because of an ExecutorFailure, we actually have no idea what the status is of the task.  The task may actually still be running, and perhaps successfully commit its output.  By allowing another task to commit its output, there is a chance that multiple tasks commit, which can result in corrupt output.  This would be particularly problematic when commit() is an expensive operation, eg. moving files on S3.
> For other task failures, we can allow other tasks to commit.  But with an ExecutorFailure, its not clear what the right thing to do is.  The only safe thing to do may be to fail the job.
> This is related to SPARK-19631, and was discovered during discussion on that PR https://github.com/apache/spark/pull/16959#discussion_r103549134



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org