You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean R. Owen (Jira)" <ji...@apache.org> on 2019/10/22 14:04:00 UTC

[jira] [Updated] (SPARK-29551) There is a bug about fetch failed when an executor lost

     [ https://issues.apache.org/jira/browse/SPARK-29551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean R. Owen updated SPARK-29551:
---------------------------------
       Fix Version/s:     (was: 2.4.3)
    Target Version/s:   (was: 2.4.3, 2.4.5, 3.0.0)
            Priority: Major  (was: Blocker)

Don't set blocker or target / fix versions please.

> There is a bug about fetch failed when an executor lost 
> --------------------------------------------------------
>
>                 Key: SPARK-29551
>                 URL: https://issues.apache.org/jira/browse/SPARK-29551
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.3
>            Reporter: weixiuli
>            Priority: Major
>
> There will be a regression when the executor lost and then causes 'fetch failed'.
> We can add  an unittest in 'DAGSchedulerSuite.scala'  to catch the above problem.
> {code}
> test("All shuffle files on the slave should be cleaned up when slave lost test") {
>     // reset the test context with the right shuffle service config
>     afterEach()
>     val conf = new SparkConf()
>     conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
>     conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")
>     init(conf)
>     runEvent(ExecutorAdded("exec-hostA1", "hostA"))
>     runEvent(ExecutorAdded("exec-hostA2", "hostA"))
>     runEvent(ExecutorAdded("exec-hostB", "hostB"))
>     val firstRDD = new MyRDD(sc, 3, Nil)
>     val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3))
>     val firstShuffleId = firstShuffleDep.shuffleId
>     val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
>     val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3))
>     val secondShuffleId = shuffleDep.shuffleId
>     val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
>     submit(reduceRdd, Array(0))
>     // map stage1 completes successfully, with one task on each executor
>     complete(taskSets(0), Seq(
>       (Success,
>         MapStatus(
>           BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 5)),
>       (Success,
>         MapStatus(
>           BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 6)),
>       (Success, makeMapStatus("hostB", 1, mapTaskId = 7))
>     ))
>     // map stage2 completes successfully, with one task on each executor
>     complete(taskSets(1), Seq(
>       (Success,
>         MapStatus(
>           BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 8)),
>       (Success,
>         MapStatus(
>           BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 9)),
>       (Success, makeMapStatus("hostB", 1, mapTaskId = 10))
>     ))
>     // make sure our test setup is correct
>     val initialMapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
>     //  val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
>     assert(initialMapStatus1.count(_ != null) === 3)
>     assert(initialMapStatus1.map{_.location.executorId}.toSet ===
>       Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
>     assert(initialMapStatus1.map{_.mapId}.toSet === Set(5, 6, 7))
>     val initialMapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
>     //  val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
>     assert(initialMapStatus2.count(_ != null) === 3)
>     assert(initialMapStatus2.map{_.location.executorId}.toSet ===
>       Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
>     assert(initialMapStatus2.map{_.mapId}.toSet === Set(8, 9, 10))
>     // kill exec-hostA2
>     runEvent(ExecutorLost("exec-hostA2", ExecutorKilled))
>     // reduce stage fails with a fetch failure from map stage from exec-hostA2
>     complete(taskSets(2), Seq(
>       (FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345),
>         secondShuffleId, 0L, 0, 0, "ignored"),
>         null)
>     ))
>     // Here is the main assertion -- make sure that we de-register
>     // the map outputs for both map stage from both executors on hostA
>     val mapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
>     assert(mapStatus1.count(_ != null) === 1)
>     assert(mapStatus1(2).location.executorId === "exec-hostB")
>     assert(mapStatus1(2).location.host === "hostB")
>     val mapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
>     assert(mapStatus2.count(_ != null) === 1)
>     assert(mapStatus2(2).location.executorId === "exec-hostB")
>     assert(mapStatus2(2).location.host === "hostB")
>   }
> {code}
> The error output is:
> {code}
> 3 did not equal 1
> ScalaTestFailureLocation: org.apache.spark.scheduler.DAGSchedulerSuite at (DAGSchedulerSuite.scala:609)
> Expected :1
> Actual   :3
>  <Click to see difference>
> org.scalatest.exceptions.TestFailedException: 3 did not equal 1
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org