You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Sean R. Owen (Jira)" <ji...@apache.org> on 2019/10/22 14:04:00 UTC
[jira] [Updated] (SPARK-29551) There is a bug about fetch failed
when an executor lost
[ https://issues.apache.org/jira/browse/SPARK-29551?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean R. Owen updated SPARK-29551:
---------------------------------
Fix Version/s: (was: 2.4.3)
Target Version/s: (was: 2.4.3, 2.4.5, 3.0.0)
Priority: Major (was: Blocker)
Don't set blocker or target / fix versions please.
> There is a bug about fetch failed when an executor lost
> --------------------------------------------------------
>
> Key: SPARK-29551
> URL: https://issues.apache.org/jira/browse/SPARK-29551
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.4.3
> Reporter: weixiuli
> Priority: Major
>
> There will be a regression when the executor lost and then causes 'fetch failed'.
> We can add an unittest in 'DAGSchedulerSuite.scala' to catch the above problem.
> {code}
> test("All shuffle files on the slave should be cleaned up when slave lost test") {
> // reset the test context with the right shuffle service config
> afterEach()
> val conf = new SparkConf()
> conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
> conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")
> init(conf)
> runEvent(ExecutorAdded("exec-hostA1", "hostA"))
> runEvent(ExecutorAdded("exec-hostA2", "hostA"))
> runEvent(ExecutorAdded("exec-hostB", "hostB"))
> val firstRDD = new MyRDD(sc, 3, Nil)
> val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3))
> val firstShuffleId = firstShuffleDep.shuffleId
> val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
> val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3))
> val secondShuffleId = shuffleDep.shuffleId
> val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
> submit(reduceRdd, Array(0))
> // map stage1 completes successfully, with one task on each executor
> complete(taskSets(0), Seq(
> (Success,
> MapStatus(
> BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 5)),
> (Success,
> MapStatus(
> BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 6)),
> (Success, makeMapStatus("hostB", 1, mapTaskId = 7))
> ))
> // map stage2 completes successfully, with one task on each executor
> complete(taskSets(1), Seq(
> (Success,
> MapStatus(
> BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 8)),
> (Success,
> MapStatus(
> BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 9)),
> (Success, makeMapStatus("hostB", 1, mapTaskId = 10))
> ))
> // make sure our test setup is correct
> val initialMapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
> // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
> assert(initialMapStatus1.count(_ != null) === 3)
> assert(initialMapStatus1.map{_.location.executorId}.toSet ===
> Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
> assert(initialMapStatus1.map{_.mapId}.toSet === Set(5, 6, 7))
> val initialMapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
> // val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
> assert(initialMapStatus2.count(_ != null) === 3)
> assert(initialMapStatus2.map{_.location.executorId}.toSet ===
> Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
> assert(initialMapStatus2.map{_.mapId}.toSet === Set(8, 9, 10))
> // kill exec-hostA2
> runEvent(ExecutorLost("exec-hostA2", ExecutorKilled))
> // reduce stage fails with a fetch failure from map stage from exec-hostA2
> complete(taskSets(2), Seq(
> (FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345),
> secondShuffleId, 0L, 0, 0, "ignored"),
> null)
> ))
> // Here is the main assertion -- make sure that we de-register
> // the map outputs for both map stage from both executors on hostA
> val mapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
> assert(mapStatus1.count(_ != null) === 1)
> assert(mapStatus1(2).location.executorId === "exec-hostB")
> assert(mapStatus1(2).location.host === "hostB")
> val mapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
> assert(mapStatus2.count(_ != null) === 1)
> assert(mapStatus2(2).location.executorId === "exec-hostB")
> assert(mapStatus2(2).location.host === "hostB")
> }
> {code}
> The error output is:
> {code}
> 3 did not equal 1
> ScalaTestFailureLocation: org.apache.spark.scheduler.DAGSchedulerSuite at (DAGSchedulerSuite.scala:609)
> Expected :1
> Actual :3
> <Click to see difference>
> org.scalatest.exceptions.TestFailedException: 3 did not equal 1
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org