You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "SuYan (JIRA)" <ji...@apache.org> on 2015/01/15 05:10:34 UTC

[jira] [Updated] (SPARK-5259) Add task equal() and hashcode() to avoid stage.pendingTasks not accurate while stage was retry

     [ https://issues.apache.org/jira/browse/SPARK-5259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

SuYan updated SPARK-5259:
-------------------------
    Description: 
1. while shuffle stage was retry, there may have 2 taskSet running. 

we call the 2 taskSet:taskSet0.0, taskSet0.1, and we know, taskSet0.1 will re-run taskSet0.0's un-complete task

if taskSet0.0 was run all the task that the taskSet0.1 not complete yet.

then stage is Available is true.

  def isAvailable: Boolean = {
    if (!isShuffleMap) {
      true
    } else {
      numAvailableOutputs == numPartitions
    }
  } 

but stage.pending task is not empty, to protect register mapStatus in mapOutputTracker.

because if task is complete success, pendingTasks is minus Task in reference-level because the task is not override hashcode() and equals()
pendingTask -= task

but numAvailableOutputs is according to partitionID.

here is the testcase to prove:

{code}
  test("Make sure mapStage.pendingtasks is set() " +
    "while MapStage.isAvailable is true while stage was retry ") {
    val firstRDD = new MyRDD(sc, 6, Nil)
    val firstShuffleDep = new ShuffleDependency(firstRDD, null)
    val firstShuyffleId = firstShuffleDep.shuffleId
    val shuffleMapRdd = new MyRDD(sc, 6, List(firstShuffleDep))
    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
    val shuffleId = shuffleDep.shuffleId
    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
    submit(reduceRdd, Array(0, 1))
    complete(taskSets(0), Seq(
      (Success, makeMapStatus("hostB", 1)),
      (Success, makeMapStatus("hostB", 2)),
      (Success, makeMapStatus("hostC", 3)),
      (Success, makeMapStatus("hostB", 4)),
      (Success, makeMapStatus("hostB", 5)),
      (Success, makeMapStatus("hostC", 6))
    ))
    complete(taskSets(1), Seq(
      (Success, makeMapStatus("hostA", 1)),
      (Success, makeMapStatus("hostB", 2)),
      (Success, makeMapStatus("hostA", 1)),
      (Success, makeMapStatus("hostB", 2)),
      (Success, makeMapStatus("hostA", 1))
    ))
    runEvent(ExecutorLost("exec-hostA"))
    runEvent(CompletionEvent(taskSets(1).tasks(0), Resubmitted, null, null, null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(2), Resubmitted, null, null, null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(0),
      FetchFailed(null, firstShuyffleId, -1, 0, "Fetch Mata data failed"),
      null, null, null, null))
    scheduler.resubmitFailedStages()
    runEvent(CompletionEvent(taskSets(1).tasks(0), Success,
      makeMapStatus("hostC", 1), null, null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(2), Success,
      makeMapStatus("hostC", 1), null, null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(4), Success,
      makeMapStatus("hostC", 1), null, null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(5), Success,
      makeMapStatus("hostB", 2), null, null, null))
    val stage = scheduler.stageIdToStage(taskSets(1).stageId)
    assert(stage.attemptId == 2)
    assert(stage.isAvailable)
    assert(stage.pendingTasks.size == 0)
  }


{code}


  was:
1. while shuffle stage was retry, there may have 2 taskSet running. 

we call the 2 taskSet:taskSet0.0, taskSet0.1, and we know, taskSet0.1 will re-run taskSet0.0's un-complete task

if taskSet0.0 was run all the task that the taskSet0.1 not complete yet.

then stage is Available is true.

  def isAvailable: Boolean = {
    if (!isShuffleMap) {
      true
    } else {
      numAvailableOutputs == numPartitions
    }
  } 

but stage.pending task is not empty, to protect register mapStatus in mapOutputTracker.

because if task is complete success, pendingTasks is minus Task in reference-level because the task is not override hashcode() and equals()
pendingTask -= task

but numAvailableOutputs is according to partitionID.

here is the testcase to prove:


  test("Make sure mapStage.pendingtasks is set() " +
    "while MapStage.isAvailable is true while stage was retry ") {
    val firstRDD = new MyRDD(sc, 6, Nil)
    val firstShuffleDep = new ShuffleDependency(firstRDD, null)
    val firstShuyffleId = firstShuffleDep.shuffleId
    val shuffleMapRdd = new MyRDD(sc, 6, List(firstShuffleDep))
    val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
    val shuffleId = shuffleDep.shuffleId
    val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
    submit(reduceRdd, Array(0, 1))
    complete(taskSets(0), Seq(
      (Success, makeMapStatus("hostB", 1)),
      (Success, makeMapStatus("hostB", 2)),
      (Success, makeMapStatus("hostC", 3)),
      (Success, makeMapStatus("hostB", 4)),
      (Success, makeMapStatus("hostB", 5)),
      (Success, makeMapStatus("hostC", 6))
    ))
    complete(taskSets(1), Seq(
      (Success, makeMapStatus("hostA", 1)),
      (Success, makeMapStatus("hostB", 2)),
      (Success, makeMapStatus("hostA", 1)),
      (Success, makeMapStatus("hostB", 2)),
      (Success, makeMapStatus("hostA", 1))
    ))
    runEvent(ExecutorLost("exec-hostA"))
    runEvent(CompletionEvent(taskSets(1).tasks(0), Resubmitted, null, null, null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(2), Resubmitted, null, null, null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(0),
      FetchFailed(null, firstShuyffleId, -1, 0, "Fetch Mata data failed"),
      null, null, null, null))
    scheduler.resubmitFailedStages()
    runEvent(CompletionEvent(taskSets(1).tasks(0), Success,
      makeMapStatus("hostC", 1), null, null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(2), Success,
      makeMapStatus("hostC", 1), null, null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(4), Success,
      makeMapStatus("hostC", 1), null, null, null))
    runEvent(CompletionEvent(taskSets(1).tasks(5), Success,
      makeMapStatus("hostB", 2), null, null, null))
    val stage = scheduler.stageIdToStage(taskSets(1).stageId)
    assert(stage.attemptId == 2)
    assert(stage.isAvailable)
    assert(stage.pendingTasks.size == 0)
  }







> Add task equal() and hashcode() to avoid stage.pendingTasks not accurate while stage was retry 
> -----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-5259
>                 URL: https://issues.apache.org/jira/browse/SPARK-5259
>             Project: Spark
>          Issue Type: Bug
>    Affects Versions: 1.1.1, 1.2.0
>            Reporter: SuYan
>             Fix For: 1.2.0
>
>
> 1. while shuffle stage was retry, there may have 2 taskSet running. 
> we call the 2 taskSet:taskSet0.0, taskSet0.1, and we know, taskSet0.1 will re-run taskSet0.0's un-complete task
> if taskSet0.0 was run all the task that the taskSet0.1 not complete yet.
> then stage is Available is true.
>   def isAvailable: Boolean = {
>     if (!isShuffleMap) {
>       true
>     } else {
>       numAvailableOutputs == numPartitions
>     }
>   } 
> but stage.pending task is not empty, to protect register mapStatus in mapOutputTracker.
> because if task is complete success, pendingTasks is minus Task in reference-level because the task is not override hashcode() and equals()
> pendingTask -= task
> but numAvailableOutputs is according to partitionID.
> here is the testcase to prove:
> {code}
>   test("Make sure mapStage.pendingtasks is set() " +
>     "while MapStage.isAvailable is true while stage was retry ") {
>     val firstRDD = new MyRDD(sc, 6, Nil)
>     val firstShuffleDep = new ShuffleDependency(firstRDD, null)
>     val firstShuyffleId = firstShuffleDep.shuffleId
>     val shuffleMapRdd = new MyRDD(sc, 6, List(firstShuffleDep))
>     val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
>     val shuffleId = shuffleDep.shuffleId
>     val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
>     submit(reduceRdd, Array(0, 1))
>     complete(taskSets(0), Seq(
>       (Success, makeMapStatus("hostB", 1)),
>       (Success, makeMapStatus("hostB", 2)),
>       (Success, makeMapStatus("hostC", 3)),
>       (Success, makeMapStatus("hostB", 4)),
>       (Success, makeMapStatus("hostB", 5)),
>       (Success, makeMapStatus("hostC", 6))
>     ))
>     complete(taskSets(1), Seq(
>       (Success, makeMapStatus("hostA", 1)),
>       (Success, makeMapStatus("hostB", 2)),
>       (Success, makeMapStatus("hostA", 1)),
>       (Success, makeMapStatus("hostB", 2)),
>       (Success, makeMapStatus("hostA", 1))
>     ))
>     runEvent(ExecutorLost("exec-hostA"))
>     runEvent(CompletionEvent(taskSets(1).tasks(0), Resubmitted, null, null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(2), Resubmitted, null, null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(0),
>       FetchFailed(null, firstShuyffleId, -1, 0, "Fetch Mata data failed"),
>       null, null, null, null))
>     scheduler.resubmitFailedStages()
>     runEvent(CompletionEvent(taskSets(1).tasks(0), Success,
>       makeMapStatus("hostC", 1), null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(2), Success,
>       makeMapStatus("hostC", 1), null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(4), Success,
>       makeMapStatus("hostC", 1), null, null, null))
>     runEvent(CompletionEvent(taskSets(1).tasks(5), Success,
>       makeMapStatus("hostB", 2), null, null, null))
>     val stage = scheduler.stageIdToStage(taskSets(1).stageId)
>     assert(stage.attemptId == 2)
>     assert(stage.isAvailable)
>     assert(stage.pendingTasks.size == 0)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org