You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "wuyi (Jira)" <ji...@apache.org> on 2021/08/24 06:38:01 UTC

[jira] [Resolved] (SPARK-36558) Stage has all tasks finished but with ongoing finalization can cause job hang

     [ https://issues.apache.org/jira/browse/SPARK-36558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

wuyi resolved SPARK-36558.
--------------------------
    Resolution: Won't Fix

> Stage has all tasks finished but with ongoing finalization can cause job hang
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-36558
>                 URL: https://issues.apache.org/jira/browse/SPARK-36558
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Spark Core
>    Affects Versions: 3.2.0, 3.3.0
>            Reporter: wuyi
>            Priority: Blocker
>
>  
> For a stage that all tasks are finished but with ongoing finalization can lead to job hang. The problem is that such stage is considered as a "missing" stage (see [https://github.com/apache/spark/blob/a47ceaf5492040063e31e17570678dc06846c36c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L719-L721).] And it breaks the original assumption that a "missing" stage must have tasks to run. 
> Normally, if stage A is the parent of (result) stage B and all tasks have finished in stage A, stage A will be skipped directly when submitting stage B. However, with this bug, stage A will be submitted. And submitting a stage with no tasks to run would not be able to add its child stage into the waiting stage list, which leads to the job hang in the end.
>  
> The example to reproduce:
> First, change `MyRDD` to allow it to compute:
> {code:java}
> override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = {
>    Iterator.single((1, 1))
>  }{code}
>  Then run this test:
> {code:java}
> test("Job hang") {
>   initPushBasedShuffleConfs(conf)
>   conf.set(config.SHUFFLE_MERGER_LOCATIONS_MIN_STATIC_THRESHOLD, 5)
>   DAGSchedulerSuite.clearMergerLocs
>   DAGSchedulerSuite.addMergerLocs(Seq("host1", "host2", "host3", "host4", "host5"))
>   val latch = new CountDownLatch(1)
>   val myDAGScheduler = new MyDAGScheduler(
>     sc,
>     sc.dagScheduler.taskScheduler,
>     sc.listenerBus,
>     sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
>     sc.env.blockManager.master,
>     sc.env) {
>     override def scheduleShuffleMergeFinalize(stage: ShuffleMapStage): Unit = {
>       // By this, we can mimic a stage with all tasks finished
>       // but finalization is incomplete.
>       latch.countDown()
>     }
>   }
>   sc.dagScheduler = myDAGScheduler
>   sc.taskScheduler.setDAGScheduler(myDAGScheduler)
>   val parts = 20
>   val shuffleMapRdd = new MyRDD(sc, parts, Nil)
>   val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(parts))
>   val reduceRdd1 = new MyRDD(sc, parts, List(shuffleDep), tracker = mapOutputTracker)
>   reduceRdd1.countAsync()
>   latch.await()
>   // scalastyle:off
>   println("=========after wait==========")
>   // set _shuffleMergedFinalized to true can avoid the hang.
>   // shuffleDep._shuffleMergedFinalized = true
>   val reduceRdd2 = new MyRDD(sc, parts, List(shuffleDep))
>   reduceRdd2.count()
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org