You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "weixiuli (Jira)" <ji...@apache.org> on 2019/10/22 10:50:00 UTC

[jira] [Created] (SPARK-29551) There is a bug about fetch failed when an executor lost

weixiuli created SPARK-29551:
--------------------------------

             Summary: There is a bug about fetch failed when an executor lost 
                 Key: SPARK-29551
                 URL: https://issues.apache.org/jira/browse/SPARK-29551
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.4.3
            Reporter: weixiuli
             Fix For: 2.4.3


There will be a regression when the executor lost and then causes 'fetch failed'.

We can add  an unittest in 'DAGSchedulerSuite.scala'  to catch the above problem.

{code}
test("All shuffle files on the slave should be cleaned up when slave lost test") {
    // reset the test context with the right shuffle service config
    afterEach()
    val conf = new SparkConf()
    conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
    conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")
    init(conf)
    runEvent(ExecutorAdded("exec-hostA1", "hostA"))
    runEvent(ExecutorAdded("exec-hostA2", "hostA"))
    runEvent(ExecutorAdded("exec-hostB", "hostB"))
    val firstRDD = new MyRDD(sc, 3, Nil)
    val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(3))
    val firstShuffleId = firstShuffleDep.shuffleId
    val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3))
    val secondShuffleId = shuffleDep.shuffleId
    val reduceRdd = new MyRDD(sc, 1, List(shuffleDep))
    submit(reduceRdd, Array(0))
    // map stage1 completes successfully, with one task on each executor
    complete(taskSets(0), Seq(
      (Success,
        MapStatus(
          BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 5)),
      (Success,
        MapStatus(
          BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 6)),
      (Success, makeMapStatus("hostB", 1, mapTaskId = 7))
    ))
    // map stage2 completes successfully, with one task on each executor
    complete(taskSets(1), Seq(
      (Success,
        MapStatus(
          BlockManagerId("exec-hostA1", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 8)),
      (Success,
        MapStatus(
          BlockManagerId("exec-hostA2", "hostA", 12345), Array.fill[Long](1)(2), mapTaskId = 9)),
      (Success, makeMapStatus("hostB", 1, mapTaskId = 10))
    ))
    // make sure our test setup is correct
    val initialMapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
    //  val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
    assert(initialMapStatus1.count(_ != null) === 3)
    assert(initialMapStatus1.map{_.location.executorId}.toSet ===
      Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
    assert(initialMapStatus1.map{_.mapId}.toSet === Set(5, 6, 7))

    val initialMapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
    //  val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
    assert(initialMapStatus2.count(_ != null) === 3)
    assert(initialMapStatus2.map{_.location.executorId}.toSet ===
      Set("exec-hostA1", "exec-hostA2", "exec-hostB"))
    assert(initialMapStatus2.map{_.mapId}.toSet === Set(8, 9, 10))

    // kill exec-hostA2
    runEvent(ExecutorLost("exec-hostA2", ExecutorKilled))
    // reduce stage fails with a fetch failure from map stage from exec-hostA2
    complete(taskSets(2), Seq(
      (FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345),
        secondShuffleId, 0L, 0, 0, "ignored"),
        null)
    ))
    // Here is the main assertion -- make sure that we de-register
    // the map outputs for both map stage from both executors on hostA
    val mapStatus1 = mapOutputTracker.shuffleStatuses(firstShuffleId).mapStatuses
    assert(mapStatus1.count(_ != null) === 1)
    assert(mapStatus1(2).location.executorId === "exec-hostB")
    assert(mapStatus1(2).location.host === "hostB")

    val mapStatus2 = mapOutputTracker.shuffleStatuses(secondShuffleId).mapStatuses
    assert(mapStatus2.count(_ != null) === 1)
    assert(mapStatus2(2).location.executorId === "exec-hostB")
    assert(mapStatus2(2).location.host === "hostB")
  }
{code}

The error output is:
{code}

3 did not equal 1
ScalaTestFailureLocation: org.apache.spark.scheduler.DAGSchedulerSuite at (DAGSchedulerSuite.scala:609)
Expected :1
Actual   :3
 <Click to see difference>

org.scalatest.exceptions.TestFailedException: 3 did not equal 1

{code}






--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org