You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2020/08/04 16:34:42 UTC

[spark] branch branch-3.0 updated: [SPARK-32003][CORE][3.0] When external shuffle service is used, unregister outputs for executor on fetch failure after executor is lost

This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fd445cb  [SPARK-32003][CORE][3.0] When external shuffle service is used, unregister outputs for executor on fetch failure after executor is lost
fd445cb is described below

commit fd445cb46ce940fbc68f85d2db14367d9f58739a
Author: Wing Yew Poon <wy...@cloudera.com>
AuthorDate: Tue Aug 4 11:33:56 2020 -0500

    [SPARK-32003][CORE][3.0] When external shuffle service is used, unregister outputs for executor on fetch failure after executor is lost
    
    ### What changes were proposed in this pull request?
    
    If an executor is lost, the `DAGScheduler` handles the executor loss by removing the executor but does not unregister its outputs if the external shuffle service is used. However, if the node on which the executor runs is lost, the shuffle service may not be able to serve the shuffle files.
    In such a case, when fetches from the executor's outputs fail in the same stage, the `DAGScheduler` again removes the executor and by right, should unregister its outputs. It doesn't because the epoch used to track the executor failure has not increased.
    
    We track the epoch for failed executors that result in lost file output separately, so we can unregister the outputs in this scenario. The idea to track a second epoch is due to Attila Zsolt Piros.
    
    ### Why are the changes needed?
    
    Without the changes, the loss of a node could require two stage attempts to recover instead of one.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    New unit test. This test fails without the change and passes with it.
    
    Closes #29193 from wypoon/SPARK-32003-3.0.
    
    Authored-by: Wing Yew Poon <wy...@cloudera.com>
    Signed-off-by: Imran Rashid <ir...@cloudera.com>
---
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 100 +++++++++++++-------
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 101 +++++++++++++++++----
 2 files changed, 151 insertions(+), 50 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index ec6eff3..51445bf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -168,13 +168,34 @@ private[spark] class DAGScheduler(
    */
   private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
 
-  // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with
-  // every task. When we detect a node failing, we note the current epoch number and failed
-  // executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results.
-  //
-  // TODO: Garbage collect information about failure epochs when we know there are no more
-  //       stray messages to detect.
-  private val failedEpoch = new HashMap[String, Long]
+  /**
+   * Tracks the latest epoch of a fully processed error related to the given executor. (We use
+   * the MapOutputTracker's epoch number, which is sent with every task.)
+   *
+   * When an executor fails, it can affect the results of many tasks, and we have to deal with
+   * all of them consistently. We don't simply ignore all future results from that executor,
+   * as the failures may have been transient; but we also don't want to "overreact" to follow-
+   * on errors we receive. Furthermore, we might receive notification of a task success, after
+   * we find out the executor has actually failed; we'll assume those successes are, in fact,
+   * simply delayed notifications and the results have been lost, if the tasks started in the
+   * same or an earlier epoch. In particular, we use this to control when we tell the
+   * BlockManagerMaster that the BlockManager has been lost.
+   */
+  private val executorFailureEpoch = new HashMap[String, Long]
+
+  /**
+   * Tracks the latest epoch of a fully processed error where shuffle files have been lost from
+   * the given executor.
+   *
+   * This is closely related to executorFailureEpoch. They only differ for the executor when
+   * there is an external shuffle service serving shuffle files and we haven't been notified that
+   * the entire worker has been lost. In that case, when an executor is lost, we do not update
+   * the shuffleFileLostEpoch; we wait for a fetch failure. This way, if only the executor
+   * fails, we do not unregister the shuffle data as it can still be served; but if there is
+   * a failure in the shuffle service (resulting in fetch failure), we unregister the shuffle
+   * data only once, even if we get many fetch failures.
+   */
+  private val shuffleFileLostEpoch = new HashMap[String, Long]
 
   private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator
 
@@ -1472,7 +1493,8 @@ private[spark] class DAGScheduler(
             val status = event.result.asInstanceOf[MapStatus]
             val execId = status.location.executorId
             logDebug("ShuffleMapTask finished on " + execId)
-            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
+            if (executorFailureEpoch.contains(execId) &&
+                smt.epoch <= executorFailureEpoch(execId)) {
               logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
             } else {
               // The epoch of the task is acceptable (i.e., the task was launched after the most
@@ -1818,12 +1840,8 @@ private[spark] class DAGScheduler(
    * modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
    *
    * We will also assume that we've lost all shuffle blocks associated with the executor if the
-   * executor serves its own blocks (i.e., we're not using external shuffle), the entire slave
-   * is lost (likely including the shuffle service), or a FetchFailed occurred, in which case we
-   * presume all shuffle data related to this executor to be lost.
-   *
-   * Optionally the epoch during which the failure was caught can be passed to avoid allowing
-   * stray fetch failures from possibly retriggering the detection of a node as lost.
+   * executor serves its own blocks (i.e., we're not using an external shuffle service), or the
+   * entire Standalone worker is lost.
    */
   private[scheduler] def handleExecutorLost(
       execId: String,
@@ -1839,29 +1857,44 @@ private[spark] class DAGScheduler(
       maybeEpoch = None)
   }
 
+  /**
+   * Handles removing an executor from the BlockManagerMaster as well as unregistering shuffle
+   * outputs for the executor or optionally its host.
+   *
+   * @param execId executor to be removed
+   * @param fileLost If true, indicates that we assume we've lost all shuffle blocks associated
+   *   with the executor; this happens if the executor serves its own blocks (i.e., we're not
+   *   using an external shuffle service), the entire Standalone worker is lost, or a FetchFailed
+   *   occurred (in which case we presume all shuffle data related to this executor to be lost).
+   * @param hostToUnregisterOutputs (optional) executor host if we're unregistering all the
+   *   outputs on the host
+   * @param maybeEpoch (optional) the epoch during which the failure was caught (this prevents
+   *   reprocessing for follow-on fetch failures)
+   */
   private def removeExecutorAndUnregisterOutputs(
       execId: String,
       fileLost: Boolean,
       hostToUnregisterOutputs: Option[String],
       maybeEpoch: Option[Long] = None): Unit = {
     val currentEpoch = maybeEpoch.getOrElse(mapOutputTracker.getEpoch)
-    if (!failedEpoch.contains(execId) || failedEpoch(execId) < currentEpoch) {
-      failedEpoch(execId) = currentEpoch
-      logInfo("Executor lost: %s (epoch %d)".format(execId, currentEpoch))
+    logDebug(s"Considering removal of executor $execId; " +
+      s"fileLost: $fileLost, currentEpoch: $currentEpoch")
+    if (!executorFailureEpoch.contains(execId) || executorFailureEpoch(execId) < currentEpoch) {
+      executorFailureEpoch(execId) = currentEpoch
+      logInfo(s"Executor lost: $execId (epoch $currentEpoch)")
       blockManagerMaster.removeExecutor(execId)
-      if (fileLost) {
-        hostToUnregisterOutputs match {
-          case Some(host) =>
-            logInfo("Shuffle files lost for host: %s (epoch %d)".format(host, currentEpoch))
-            mapOutputTracker.removeOutputsOnHost(host)
-          case None =>
-            logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
-            mapOutputTracker.removeOutputsOnExecutor(execId)
-        }
-        clearCacheLocs()
-
-      } else {
-        logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch))
+      clearCacheLocs()
+    }
+    if (fileLost &&
+        (!shuffleFileLostEpoch.contains(execId) || shuffleFileLostEpoch(execId) < currentEpoch)) {
+      shuffleFileLostEpoch(execId) = currentEpoch
+      hostToUnregisterOutputs match {
+        case Some(host) =>
+          logInfo(s"Shuffle files lost for host: $host (epoch $currentEpoch)")
+          mapOutputTracker.removeOutputsOnHost(host)
+        case None =>
+          logInfo(s"Shuffle files lost for executor: $execId (epoch $currentEpoch)")
+          mapOutputTracker.removeOutputsOnExecutor(execId)
       }
     }
   }
@@ -1887,11 +1920,12 @@ private[spark] class DAGScheduler(
   }
 
   private[scheduler] def handleExecutorAdded(execId: String, host: String): Unit = {
-    // remove from failedEpoch(execId) ?
-    if (failedEpoch.contains(execId)) {
+    // remove from executorFailureEpoch(execId) ?
+    if (executorFailureEpoch.contains(execId)) {
       logInfo("Host added was in lost list earlier: " + host)
-      failedEpoch -= execId
+      executorFailureEpoch -= execId
     }
+    shuffleFileLostEpoch -= execId
   }
 
   private[scheduler] def handleStageCancellation(stageId: Int, reason: Option[String]): Unit = {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 02bc216..d92bd5b 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -25,6 +25,9 @@ import scala.annotation.meta.param
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
 import scala.util.control.NonFatal
 
+import org.mockito.Mockito.spy
+import org.mockito.Mockito.times
+import org.mockito.Mockito.verify
 import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
 import org.scalatest.exceptions.TestFailedException
 import org.scalatest.time.SpanSugar._
@@ -232,6 +235,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
 
   var sparkListener: EventInfoRecordingListener = null
 
+  var blockManagerMaster: BlockManagerMaster = null
   var mapOutputTracker: MapOutputTrackerMaster = null
   var broadcastManager: BroadcastManager = null
   var securityMgr: SecurityManager = null
@@ -245,17 +249,18 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
    */
   val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]]
   // stub out BlockManagerMaster.getLocations to use our cacheLocations
-  val blockManagerMaster = new BlockManagerMaster(null, null, conf, true) {
-      override def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
-        blockIds.map {
-          _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)).
-            getOrElse(Seq())
-        }.toIndexedSeq
-      }
-      override def removeExecutor(execId: String): Unit = {
-        // don't need to propagate to the driver, which we don't have
-      }
+  class MyBlockManagerMaster(conf: SparkConf) extends BlockManagerMaster(null, null, conf, true) {
+    override def getLocations(blockIds: Array[BlockId]): IndexedSeq[Seq[BlockManagerId]] = {
+      blockIds.map {
+        _.asRDDId.map { id => (id.rddId -> id.splitIndex)
+        }.flatMap { key => cacheLocations.get(key)
+        }.getOrElse(Seq())
+      }.toIndexedSeq
     }
+    override def removeExecutor(execId: String): Unit = {
+      // don't need to propagate to the driver, which we don't have
+    }
+  }
 
   /** The list of results that DAGScheduler has collected. */
   val results = new HashMap[Int, Any]()
@@ -273,6 +278,16 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     override def jobFailed(exception: Exception): Unit = { failure = exception }
   }
 
+  class MyMapOutputTrackerMaster(
+      conf: SparkConf,
+      broadcastManager: BroadcastManager)
+    extends MapOutputTrackerMaster(conf, broadcastManager, true) {
+
+    override def sendTracker(message: Any): Unit = {
+      // no-op, just so we can stop this to avoid leaking threads
+    }
+  }
+
   override def beforeEach(): Unit = {
     super.beforeEach()
     init(new SparkConf())
@@ -290,11 +305,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     results.clear()
     securityMgr = new SecurityManager(conf)
     broadcastManager = new BroadcastManager(true, conf, securityMgr)
-    mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) {
-      override def sendTracker(message: Any): Unit = {
-        // no-op, just so we can stop this to avoid leaking threads
-      }
-    }
+    mapOutputTracker = spy(new MyMapOutputTrackerMaster(conf, broadcastManager))
+    blockManagerMaster = spy(new MyBlockManagerMaster(conf))
     scheduler = new DAGScheduler(
       sc,
       taskScheduler,
@@ -537,6 +549,59 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     assert(mapStatus2(2).location.host === "hostB")
   }
 
+  test("SPARK-32003: All shuffle files for executor should be cleaned up on fetch failure") {
+    // reset the test context with the right shuffle service config
+    afterEach()
+    val conf = new SparkConf()
+    conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
+    init(conf)
+
+    val shuffleMapRdd = new MyRDD(sc, 3, Nil)
+    val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(3))
+    val shuffleId = shuffleDep.shuffleId
+    val reduceRdd = new MyRDD(sc, 3, List(shuffleDep), tracker = mapOutputTracker)
+
+    submit(reduceRdd, Array(0, 1, 2))
+    // Map stage completes successfully,
+    // two tasks are run on an executor on hostA and one on an executor on hostB
+    complete(taskSets(0), Seq(
+      (Success, makeMapStatus("hostA", 3)),
+      (Success, makeMapStatus("hostA", 3)),
+      (Success, makeMapStatus("hostB", 3))))
+    // Now the executor on hostA is lost
+    runEvent(ExecutorLost("exec-hostA", ExecutorExited(-100, false, "Container marked as failed")))
+    // Executor is removed but shuffle files are not unregistered
+    verify(blockManagerMaster, times(1)).removeExecutor("exec-hostA")
+    verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("exec-hostA")
+
+    // The MapOutputTracker has all the shuffle files
+    val mapStatuses = mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses
+    assert(mapStatuses.count(_ != null) === 3)
+    assert(mapStatuses.count(s => s != null && s.location.executorId == "exec-hostA") === 2)
+    assert(mapStatuses.count(s => s != null && s.location.executorId == "exec-hostB") === 1)
+
+    // Now a fetch failure from the lost executor occurs
+    complete(taskSets(1), Seq(
+      (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, "ignored"), null)
+    ))
+    // blockManagerMaster.removeExecutor is not called again
+    // but shuffle files are unregistered
+    verify(blockManagerMaster, times(1)).removeExecutor("exec-hostA")
+    verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("exec-hostA")
+
+    // Shuffle files for exec-hostA should be lost
+    assert(mapStatuses.count(_ != null) === 1)
+    assert(mapStatuses.count(s => s != null && s.location.executorId == "exec-hostA") === 0)
+    assert(mapStatuses.count(s => s != null && s.location.executorId == "exec-hostB") === 1)
+
+    // Additional fetch failure from the executor does not result in further call to
+    // mapOutputTracker.removeOutputsOnExecutor
+    complete(taskSets(1), Seq(
+      (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 1, 0, "ignored"), null)
+    ))
+    verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("exec-hostA")
+  }
+
   test("zero split job") {
     var numResults = 0
     var failureReason: Option[Exception] = None
@@ -762,8 +827,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
     complete(taskSets(1), Seq(
       (Success, 42),
       (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0L, 0, 0, "ignored"), null)))
-    // this will get called
-    // blockManagerMaster.removeExecutor("exec-hostA")
+    verify(blockManagerMaster, times(1)).removeExecutor("exec-hostA")
     // ask the scheduler to try it again
     scheduler.resubmitFailedStages()
     // have the 2nd attempt pass
@@ -806,11 +870,14 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
         (Success, makeMapStatus("hostA", 1)),
         (Success, makeMapStatus("hostB", 1))))
       runEvent(ExecutorLost("exec-hostA", event))
+      verify(blockManagerMaster, times(1)).removeExecutor("exec-hostA")
       if (expectFileLoss) {
+        verify(mapOutputTracker, times(1)).removeOutputsOnExecutor("exec-hostA")
         intercept[MetadataFetchFailedException] {
           mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0)
         }
       } else {
+        verify(mapOutputTracker, times(0)).removeOutputsOnExecutor("exec-hostA")
         assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
           HashSet(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org