You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Imran Rashid (JIRA)" <ji...@apache.org> on 2015/07/29 18:51:06 UTC

[jira] [Updated] (SPARK-5259) Fix endless retry stage by add task equal() and hashcode() to avoid stage.pendingTasks not empty while stage map output is available

     [ https://issues.apache.org/jira/browse/SPARK-5259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Imran Rashid updated SPARK-5259:
--------------------------------
    Target Version/s: 1.5.0
            Priority: Blocker  (was: Major)

> Fix endless retry stage by add task equal() and hashcode() to avoid stage.pendingTasks not empty while stage map output is available 
> -------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-5259
>                 URL: https://issues.apache.org/jira/browse/SPARK-5259
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.1.1, 1.2.0
>            Reporter: SuYan
>            Priority: Blocker
>
> 1. while shuffle stage was retry, there may have 2 taskSet running. 
> we call the 2 taskSet:taskSet0.0, taskSet0.1, and we know, taskSet0.1 will re-run taskSet0.0's un-complete task
> if taskSet0.0 was run all the task that the taskSet0.1 not complete yet but covered the partitions.
> then stage is Available is true.
> {code}
>   def isAvailable: Boolean = {
>     if (!isShuffleMap) {
>       true
>     } else {
>       numAvailableOutputs == numPartitions
>     }
>   } 
> {code}
> but stage.pending task is not empty, to protect register mapStatus in mapOutputTracker.
> because if task is complete success, pendingTasks is minus Task in reference-level because the task is not override hashcode() and equals()
> pendingTask -= task
> but numAvailableOutputs is according to partitionID.
> here is the testcase to prove:
> {code}
>   test("Make sure mapStage.pendingtasks is set() " +
>     "while MapStage.isAvailable is true while stage was retry ") {
>     val firstRDD = new MyRDD(sc, 6, Nil)
>     val firstShuffleDep = new ShuffleDependency(firstRDD, null)
>     val firstShuyffleId = firstShuffleDep.shuffleId
>     val shuffleMapRdd = new MyRDD(sc, 6, List(firstShuffleDep))
>     val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
>     val shuffleId = shuffleDep.shuffleId
>     val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
>     submit(reduceRdd, Array(0, 1))
>     complete(taskSets(0), Seq(
>       (Success, makeMapStatus("hostB", 1)),
>       (Success, makeMapStatus("hostB", 2)),
>       (Success, makeMapStatus("hostC", 3)),
>       (Success, makeMapStatus("hostB", 4)),
>       (Success, makeMapStatus("hostB", 5)),
>       (Success, makeMapStatus("hostC", 6))
>     ))
>     complete(taskSets(1), Seq(
>       (Success, makeMapStatus("hostA", 1)),
>       (Success, makeMapStatus("hostB", 2)),
>       (Success, makeMapStatus("hostA", 1)),
>       (Success, makeMapStatus("hostB", 2)),
>       (Success, makeMapStatus("hostA", 1))
>     ))
>     runEvent(ExecutorLost("exec-hostA"))
>     runEvent(CompletionEvent(taskSets(1).tasks(0), Resubmitted, null, null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(2), Resubmitted, null, null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(0),
>       FetchFailed(null, firstShuyffleId, -1, 0, "Fetch Mata data failed"),
>       null, null, null, null))
>     scheduler.resubmitFailedStages()
>     runEvent(CompletionEvent(taskSets(1).tasks(0), Success,
>       makeMapStatus("hostC", 1), null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(2), Success,
>       makeMapStatus("hostC", 1), null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(4), Success,
>       makeMapStatus("hostC", 1), null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(5), Success,
>       makeMapStatus("hostB", 2), null, null, null))
>     val stage = scheduler.stageIdToStage(taskSets(1).stageId)
>     assert(stage.attemptId == 2)
>     assert(stage.isAvailable)
>     assert(stage.pendingTasks.size == 0)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org