You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "xukun (JIRA)" <ji...@apache.org> on 2016/12/09 12:06:58 UTC
[jira] [Commented] (SPARK-5259) Do not submit stage until its
dependencies map outputs are registered
[ https://issues.apache.org/jira/browse/SPARK-5259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15735160#comment-15735160 ]
xukun commented on SPARK-5259:
------------------------------
[~squito] [~SuYan] Would it be possible to backport this to branch 1.5?
> Do not submit stage until its dependencies map outputs are registered
> ---------------------------------------------------------------------
>
> Key: SPARK-5259
> URL: https://issues.apache.org/jira/browse/SPARK-5259
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.1.1, 1.2.0
> Reporter: SuYan
> Assignee: SuYan
> Priority: Critical
> Fix For: 1.6.0
>
>
> We should track pending tasks by partition ID instead of Task objects.
> Before this, failure & retry could result in a case where a stage got submitted before the map output from its dependencies get registered. This was due to an error in the condition for registering map outputs.
> More complete explanation of the original problem:
> 1. while shuffle stage was retry, there may have 2 taskSet running.
> we call the 2 taskSet:taskSet0.0, taskSet0.1, and we know, taskSet0.1 will re-run taskSet0.0's un-complete task
> if taskSet0.0 was run all the task that the taskSet0.1 not complete yet but covered the partitions.
> then stage is Available is true.
> {code}
> def isAvailable: Boolean = {
> if (!isShuffleMap) {
> true
> } else {
> numAvailableOutputs == numPartitions
> }
> }
> {code}
> but stage.pending task is not empty, to protect register mapStatus in mapOutputTracker.
> because if task is complete success, pendingTasks is minus Task in reference-level because the task is not override hashcode() and equals()
> pendingTask -= task
> but numAvailableOutputs is according to partitionID.
> here is the testcase to prove:
> {code}
> test("Make sure mapStage.pendingtasks is set() " +
> "while MapStage.isAvailable is true while stage was retry ") {
> val firstRDD = new MyRDD(sc, 6, Nil)
> val firstShuffleDep = new ShuffleDependency(firstRDD, null)
> val firstShuyffleId = firstShuffleDep.shuffleId
> val shuffleMapRdd = new MyRDD(sc, 6, List(firstShuffleDep))
> val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
> val shuffleId = shuffleDep.shuffleId
> val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
> submit(reduceRdd, Array(0, 1))
> complete(taskSets(0), Seq(
> (Success, makeMapStatus("hostB", 1)),
> (Success, makeMapStatus("hostB", 2)),
> (Success, makeMapStatus("hostC", 3)),
> (Success, makeMapStatus("hostB", 4)),
> (Success, makeMapStatus("hostB", 5)),
> (Success, makeMapStatus("hostC", 6))
> ))
> complete(taskSets(1), Seq(
> (Success, makeMapStatus("hostA", 1)),
> (Success, makeMapStatus("hostB", 2)),
> (Success, makeMapStatus("hostA", 1)),
> (Success, makeMapStatus("hostB", 2)),
> (Success, makeMapStatus("hostA", 1))
> ))
> runEvent(ExecutorLost("exec-hostA"))
> runEvent(CompletionEvent(taskSets(1).tasks(0), Resubmitted, null, null, null, null))
> runEvent(CompletionEvent(taskSets(1).tasks(2), Resubmitted, null, null, null, null))
> runEvent(CompletionEvent(taskSets(1).tasks(0),
> FetchFailed(null, firstShuyffleId, -1, 0, "Fetch Mata data failed"),
> null, null, null, null))
> scheduler.resubmitFailedStages()
> runEvent(CompletionEvent(taskSets(1).tasks(0), Success,
> makeMapStatus("hostC", 1), null, null, null))
> runEvent(CompletionEvent(taskSets(1).tasks(2), Success,
> makeMapStatus("hostC", 1), null, null, null))
> runEvent(CompletionEvent(taskSets(1).tasks(4), Success,
> makeMapStatus("hostC", 1), null, null, null))
> runEvent(CompletionEvent(taskSets(1).tasks(5), Success,
> makeMapStatus("hostB", 2), null, null, null))
> val stage = scheduler.stageIdToStage(taskSets(1).stageId)
> assert(stage.attemptId == 2)
> assert(stage.isAvailable)
> assert(stage.pendingTasks.size == 0)
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org