You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2015/07/27 18:06:04 UTC
[jira] [Commented] (SPARK-5259) Fix endless retry stage by add task
equal() and hashcode() to avoid stage.pendingTasks not empty while stage
map output is available
[ https://issues.apache.org/jira/browse/SPARK-5259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14642927#comment-14642927 ]
Apache Spark commented on SPARK-5259:
-------------------------------------
User 'squito' has created a pull request for this issue:
https://github.com/apache/spark/pull/7699
> Fix endless retry stage by add task equal() and hashcode() to avoid stage.pendingTasks not empty while stage map output is available
> -------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-5259
> URL: https://issues.apache.org/jira/browse/SPARK-5259
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 1.1.1, 1.2.0
> Reporter: SuYan
>
> 1. while shuffle stage was retry, there may have 2 taskSet running.
> we call the 2 taskSet:taskSet0.0, taskSet0.1, and we know, taskSet0.1 will re-run taskSet0.0's un-complete task
> if taskSet0.0 was run all the task that the taskSet0.1 not complete yet but covered the partitions.
> then stage is Available is true.
> {code}
> def isAvailable: Boolean = {
> if (!isShuffleMap) {
> true
> } else {
> numAvailableOutputs == numPartitions
> }
> }
> {code}
> but stage.pending task is not empty, to protect register mapStatus in mapOutputTracker.
> because if task is complete success, pendingTasks is minus Task in reference-level because the task is not override hashcode() and equals()
> pendingTask -= task
> but numAvailableOutputs is according to partitionID.
> here is the testcase to prove:
> {code}
> test("Make sure mapStage.pendingtasks is set() " +
> "while MapStage.isAvailable is true while stage was retry ") {
> val firstRDD = new MyRDD(sc, 6, Nil)
> val firstShuffleDep = new ShuffleDependency(firstRDD, null)
> val firstShuyffleId = firstShuffleDep.shuffleId
> val shuffleMapRdd = new MyRDD(sc, 6, List(firstShuffleDep))
> val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
> val shuffleId = shuffleDep.shuffleId
> val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
> submit(reduceRdd, Array(0, 1))
> complete(taskSets(0), Seq(
> (Success, makeMapStatus("hostB", 1)),
> (Success, makeMapStatus("hostB", 2)),
> (Success, makeMapStatus("hostC", 3)),
> (Success, makeMapStatus("hostB", 4)),
> (Success, makeMapStatus("hostB", 5)),
> (Success, makeMapStatus("hostC", 6))
> ))
> complete(taskSets(1), Seq(
> (Success, makeMapStatus("hostA", 1)),
> (Success, makeMapStatus("hostB", 2)),
> (Success, makeMapStatus("hostA", 1)),
> (Success, makeMapStatus("hostB", 2)),
> (Success, makeMapStatus("hostA", 1))
> ))
> runEvent(ExecutorLost("exec-hostA"))
> runEvent(CompletionEvent(taskSets(1).tasks(0), Resubmitted, null, null, null, null))
> runEvent(CompletionEvent(taskSets(1).tasks(2), Resubmitted, null, null, null, null))
> runEvent(CompletionEvent(taskSets(1).tasks(0),
> FetchFailed(null, firstShuyffleId, -1, 0, "Fetch Mata data failed"),
> null, null, null, null))
> scheduler.resubmitFailedStages()
> runEvent(CompletionEvent(taskSets(1).tasks(0), Success,
> makeMapStatus("hostC", 1), null, null, null))
> runEvent(CompletionEvent(taskSets(1).tasks(2), Success,
> makeMapStatus("hostC", 1), null, null, null))
> runEvent(CompletionEvent(taskSets(1).tasks(4), Success,
> makeMapStatus("hostC", 1), null, null, null))
> runEvent(CompletionEvent(taskSets(1).tasks(5), Success,
> makeMapStatus("hostB", 2), null, null, null))
> val stage = scheduler.stageIdToStage(taskSets(1).stageId)
> assert(stage.attemptId == 2)
> assert(stage.isAvailable)
> assert(stage.pendingTasks.size == 0)
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org