You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "wuyi (Jira)" <ji...@apache.org> on 2021/08/23 03:08:00 UTC

[jira] [Updated] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang

     [ https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

wuyi updated SPARK-36558:
-------------------------
    Description: 
 

For a stage that all tasks are finished but with ongoing finalization can lead to job hang. The problem is that such stage is considered as a "missing" stage (see [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] And it breaks the original assumption that a "missing" stage must have tasks to run. 

Normally, if stage A is the parent of (result) stage B and all tasks have finished in stage A, stage A will be skipped directly when submitting stage B. However, with this bug, stage A will be submitted. And submitting a stage with no tasks to run would not be able to add child stage into the waiting stage list, which leads to the job hang in the end.

 

The example to reproduce:
{code:java}
test("Job hang") {
  initPushBasedShuffleConfs(conf)
  conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
  DAGSchedulerSuite.clearMergerLocs
  DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5"))
  val latch = new CountDownLatch(1)
  val myDAGScheduler = new MyDAGScheduler(
    sc,
    sc.dagScheduler.taskScheduler,
    sc.listenerBus,
    sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
    sc.env.blockManager.master,
    sc.env) {
    override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = {
      // By this, we can mimic a stage with all tasks finished
      // but finalization is incomplete.
      latch.countDown()
    }
  }
  sc.dagScheduler = myDAGScheduler
  sc.taskScheduler.setDAGScheduler(myDAGScheduler)
  val parts = 20
  val shuffleMapRdd = new MyRDD(sc, parts, Nil)
  val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
  val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
  reduceRdd1.countAsync()
  latch.await()
  // set _shuffleMergedFinalized to true can avoid the hang.
  // shuffleDep._shuffleMergedFinalized = true
  val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
  reduceRdd2.count()
}
{code}
 

  was:
 

For a stage that all tasks are finished but with ongoing finalization can lead to job hang. The problem is that such stage is considered as a "missing" stage (see [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] And it breaks the original assumption that a "missing" stage must have tasks to run. 

Normally, if stage A is the parent of (result) stage B and all tasks have finished in stage A, stage A will be skipped directly when submitting stage B. However, with this bug, stage A will be submitted, which leads to the job hang in the end.

 

The example to reproduce:
{code:java}
test("Job hang") {
  initPushBasedShuffleConfs(conf)
  conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
  DAGSchedulerSuite.clearMergerLocs
  DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5"))
  val latch = new CountDownLatch(1)
  val myDAGScheduler = new MyDAGScheduler(
    sc,
    sc.dagScheduler.taskScheduler,
    sc.listenerBus,
    sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
    sc.env.blockManager.master,
    sc.env) {
    override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = {
      // By this, we can mimic a stage with all tasks finished
      // but finalization is incomplete.
      latch.countDown()
    }
  }
  sc.dagScheduler = myDAGScheduler
  sc.taskScheduler.setDAGScheduler(myDAGScheduler)
  val parts = 20
  val shuffleMapRdd = new MyRDD(sc, parts, Nil)
  val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
  val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
  reduceRdd1.countAsync()
  latch.await()
  // set _shuffleMergedFinalized to true can avoid the hang.
  // shuffleDep._shuffleMergedFinalized = true
  val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
  reduceRdd2.count()
}
{code}
 


> Stage has all tasks finished but with ongoing finalization can cause job hang
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-36558
>                 URL: https://issues.apache.org/jira/browse/SPARK-36558
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Spark Core
>    Affects Versions: 3.2.0, 3.3.0
>            Reporter: wuyi
>            Priority: Blocker
>
>  
> For a stage that all tasks are finished but with ongoing finalization can lead to job hang. The problem is that such stage is considered as a "missing" stage (see [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] And it breaks the original assumption that a "missing" stage must have tasks to run. 
> Normally, if stage A is the parent of (result) stage B and all tasks have finished in stage A, stage A will be skipped directly when submitting stage B. However, with this bug, stage A will be submitted. And submitting a stage with no tasks to run would not be able to add child stage into the waiting stage list, which leads to the job hang in the end.
>  
> The example to reproduce:
> {code:java}
> test("Job hang") {
>   initPushBasedShuffleConfs(conf)
>   conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
>   DAGSchedulerSuite.clearMergerLocs
>   DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5"))
>   val latch = new CountDownLatch(1)
>   val myDAGScheduler = new MyDAGScheduler(
>     sc,
>     sc.dagScheduler.taskScheduler,
>     sc.listenerBus,
>     sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
>     sc.env.blockManager.master,
>     sc.env) {
>     override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = {
>       // By this, we can mimic a stage with all tasks finished
>       // but finalization is incomplete.
>       latch.countDown()
>     }
>   }
>   sc.dagScheduler = myDAGScheduler
>   sc.taskScheduler.setDAGScheduler(myDAGScheduler)
>   val parts = 20
>   val shuffleMapRdd = new MyRDD(sc, parts, Nil)
>   val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
>   val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
>   reduceRdd1.countAsync()
>   latch.await()
>   // set _shuffleMergedFinalized to true can avoid the hang.
>   // shuffleDep._shuffleMergedFinalized = true
>   val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
>   reduceRdd2.count()
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org