You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2015/07/27 18:06:04 UTC

[jira] [Commented] (SPARK-5259) Fix endless retry stage by add task equal() and hashcode() to avoid stage.pendingTasks not empty while stage map output is available

    [ https://issues.apache.org/jira/browse/SPARK-5259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14642927#comment-14642927 ] 

Apache Spark commented on SPARK-5259:
-------------------------------------

User 'squito' has created a pull request for this issue:
https://github.com/apache/spark/pull/7699

> Fix endless retry stage by add task equal() and hashcode() to avoid stage.pendingTasks not empty while stage map output is available 
> -------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-5259
>                 URL: https://issues.apache.org/jira/browse/SPARK-5259
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.1.1, 1.2.0
>            Reporter: SuYan
>
> 1. while shuffle stage was retry, there may have 2 taskSet running. 
> we call the 2 taskSet:taskSet0.0, taskSet0.1, and we know, taskSet0.1 will re-run taskSet0.0's un-complete task
> if taskSet0.0 was run all the task that the taskSet0.1 not complete yet but covered the partitions.
> then stage is Available is true.
> {code}
>   def isAvailable: Boolean = {
>     if (!isShuffleMap) {
>       true
>     } else {
>       numAvailableOutputs == numPartitions
>     }
>   } 
> {code}
> but stage.pending task is not empty, to protect register mapStatus in mapOutputTracker.
> because if task is complete success, pendingTasks is minus Task in reference-level because the task is not override hashcode() and equals()
> pendingTask -= task
> but numAvailableOutputs is according to partitionID.
> here is the testcase to prove:
> {code}
>   test("Make sure mapStage.pendingtasks is set() " +
>     "while MapStage.isAvailable is true while stage was retry ") {
>     val firstRDD = new MyRDD(sc, 6, Nil)
>     val firstShuffleDep = new ShuffleDependency(firstRDD, null)
>     val firstShuyffleId = firstShuffleDep.shuffleId
>     val shuffleMapRdd = new MyRDD(sc, 6, List(firstShuffleDep))
>     val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
>     val shuffleId = shuffleDep.shuffleId
>     val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
>     submit(reduceRdd, Array(0, 1))
>     complete(taskSets(0), Seq(
>       (Success, makeMapStatus("hostB", 1)),
>       (Success, makeMapStatus("hostB", 2)),
>       (Success, makeMapStatus("hostC", 3)),
>       (Success, makeMapStatus("hostB", 4)),
>       (Success, makeMapStatus("hostB", 5)),
>       (Success, makeMapStatus("hostC", 6))
>     ))
>     complete(taskSets(1), Seq(
>       (Success, makeMapStatus("hostA", 1)),
>       (Success, makeMapStatus("hostB", 2)),
>       (Success, makeMapStatus("hostA", 1)),
>       (Success, makeMapStatus("hostB", 2)),
>       (Success, makeMapStatus("hostA", 1))
>     ))
>     runEvent(ExecutorLost("exec-hostA"))
>     runEvent(CompletionEvent(taskSets(1).tasks(0), Resubmitted, null, null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(2), Resubmitted, null, null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(0),
>       FetchFailed(null, firstShuyffleId, -1, 0, "Fetch Mata data failed"),
>       null, null, null, null))
>     scheduler.resubmitFailedStages()
>     runEvent(CompletionEvent(taskSets(1).tasks(0), Success,
>       makeMapStatus("hostC", 1), null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(2), Success,
>       makeMapStatus("hostC", 1), null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(4), Success,
>       makeMapStatus("hostC", 1), null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(5), Success,
>       makeMapStatus("hostB", 2), null, null, null))
>     val stage = scheduler.stageIdToStage(taskSets(1).stageId)
>     assert(stage.attemptId == 2)
>     assert(stage.isAvailable)
>     assert(stage.pendingTasks.size == 0)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org