You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "weixiuli (Jira)" <ji...@apache.org> on 2019/10/22 10:50:00 UTC
[jira] [Created] (SPARK-29551) There is a bug about fetch failed
when an executor lost
weixiuli created SPARK-29551:
--------------------------------
Summary: There is a bug about fetch failed when an executor lost
Key: SPARK-29551
URL: https://issues.apache.org/jira/browse/SPARK-29551
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 2.4.3
Reporter: weixiuli
Fix For: 2.4.3
There will be a regression when the executor lost and then causes 'fetch failed'.
We can add an unittest in 'DAGSchedulerSuite.scala' to catch the above problem.
{code}
test("All shuffle files on the slave should be cleaned up when slave lost test") {
// reset the test context with the right shuffle service config
afterEach()
val conf = new SparkConf()
conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")
init(conf)
runEvent(ExecutorAdded("exec-hostA1", "hostA"))
runEvent(ExecutorAdded("exec-hostA2", "hostA"))
runEvent(ExecutorAdded("exec-hostB", "hostB"))
val firstRDD = new MyRDD(sc, 3, Nil)
val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3))
val firstShuffleId = firstShuffleDep.shuffleId
val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3))
val secondShuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
submit(reduceRdd, Array(0))
// map stage1 completes successfully, with one task on each executor
complete(taskSets(0), Seq(
(Success,
MapStatus(
BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 5)),
(Success,
MapStatus(
BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 6)),
(Success, makeMapStatus("hostB", 1, mapTaskId = 7))
))
// map stage2 completes successfully, with one task on each executor
complete(taskSets(1), Seq(
(Success,
MapStatus(
BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 8)),
(Success,
MapStatus(
BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 9)),
(Success, makeMapStatus("hostB", 1, mapTaskId = 10))
))
// make sure our test setup is correct
val initialMapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
// val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
assert(initialMapStatus1.count(_ != null) === 3)
assert(initialMapStatus1.map{_.location.executorId}.toSet ===
Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
assert(initialMapStatus1.map{_.mapId}.toSet === Set(5, 6, 7))
val initialMapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
// val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
assert(initialMapStatus2.count(_ != null) === 3)
assert(initialMapStatus2.map{_.location.executorId}.toSet ===
Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
assert(initialMapStatus2.map{_.mapId}.toSet === Set(8, 9, 10))
// kill exec-hostA2
runEvent(ExecutorLost("exec-hostA2", ExecutorKilled))
// reduce stage fails with a fetch failure from map stage from exec-hostA2
complete(taskSets(2), Seq(
(FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345),
secondShuffleId, 0L, 0, 0, "ignored"),
null)
))
// Here is the main assertion -- make sure that we de-register
// the map outputs for both map stage from both executors on hostA
val mapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
assert(mapStatus1.count(_ != null) === 1)
assert(mapStatus1(2).location.executorId === "exec-hostB")
assert(mapStatus1(2).location.host === "hostB")
val mapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
assert(mapStatus2.count(_ != null) === 1)
assert(mapStatus2(2).location.executorId === "exec-hostB")
assert(mapStatus2(2).location.host === "hostB")
}
{code}
The error output is:
{code}
3 did not equal 1
ScalaTestFailureLocation: org.apache.spark.scheduler.DAGSchedulerSuite at (DAGSchedulerSuite.scala:609)
Expected :1
Actual :3
<Click to see difference>
org.scalatest.exceptions.TestFailedException: 3 did not equal 1
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org