You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2015/12/09 18:52:30 UTC

spark git commit: [SPARK-10582][YARN][CORE] Fix AM failure situation for dynamic allocation

Repository: spark
Updated Branches:
  refs/heads/master 22b9a8740 -> 6900f0173


[SPARK-10582][YARN][CORE] Fix AM failure situation for dynamic allocation

Because of AM failure, the target executor number between driver and AM will be different, which will lead to unexpected behavior in dynamic allocation. So when AM is re-registered with driver, state in `ExecutorAllocationManager` and `CoarseGrainedSchedulerBacked` should be reset.

This issue is originally addressed in #8737 , here re-opened again. Thanks a lot KaiXinXiaoLei for finding this issue.

andrewor14 and vanzin would you please help to review this, thanks a lot.

Author: jerryshao <ss...@hortonworks.com>

Closes #9963 from jerryshao/SPARK-10582.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6900f017
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6900f017
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6900f017

Branch: refs/heads/master
Commit: 6900f0173790ad2fa4c79a426bd2dec2d149daa2
Parents: 22b9a87
Author: jerryshao <ss...@hortonworks.com>
Authored: Wed Dec 9 09:50:43 2015 -0800
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Wed Dec 9 09:52:03 2015 -0800

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       | 18 ++++-
 .../cluster/CoarseGrainedSchedulerBackend.scala | 19 +++++
 .../spark/ExecutorAllocationManagerSuite.scala  | 84 ++++++++++++++++++++
 .../cluster/YarnSchedulerBackend.scala          | 23 ++++++
 4 files changed, 142 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6900f017/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 34c32ce..6176e25 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -89,6 +89,8 @@ private[spark] class ExecutorAllocationManager(
   private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
   private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
     Integer.MAX_VALUE)
+  private val initialNumExecutors = conf.getInt("spark.dynamicAllocation.initialExecutors",
+    minNumExecutors)
 
   // How long there must be backlogged tasks for before an addition is triggered (seconds)
   private val schedulerBacklogTimeoutS = conf.getTimeAsSeconds(
@@ -121,8 +123,7 @@ private[spark] class ExecutorAllocationManager(
 
   // The desired number of executors at this moment in time. If all our executors were to die, this
   // is the number of executors we would immediately want from the cluster manager.
-  private var numExecutorsTarget =
-    conf.getInt("spark.dynamicAllocation.initialExecutors", minNumExecutors)
+  private var numExecutorsTarget = initialNumExecutors
 
   // Executors that have been requested to be removed but have not been killed yet
   private val executorsPendingToRemove = new mutable.HashSet[String]
@@ -241,6 +242,19 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
+   * Reset the allocation manager to the initial state. Currently this will only be called in
+   * yarn-client mode when AM re-registers after a failure.
+   */
+  def reset(): Unit = synchronized {
+    initializing = true
+    numExecutorsTarget = initialNumExecutors
+    numExecutorsToAdd = 1
+
+    executorsPendingToRemove.clear()
+    removeTimes.clear()
+  }
+
+  /**
    * The maximum number of executors we would need under the current load to satisfy all running
    * and pending tasks, rounded up.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/6900f017/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 505c161..7efe167 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -341,6 +341,25 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
     }
   }
 
+  /**
+   * Reset the state of CoarseGrainedSchedulerBackend to the initial state. Currently it will only
+   * be called in the yarn-client mode when AM re-registers after a failure, also dynamic
+   * allocation is enabled.
+   * */
+  protected def reset(): Unit = synchronized {
+    if (Utils.isDynamicAllocationEnabled(conf)) {
+      numPendingExecutors = 0
+      executorsPendingToRemove.clear()
+
+      // Remove all the lingering executors that should be removed but not yet. The reason might be
+      // because (1) disconnected event is not yet received; (2) executors die silently.
+      executorDataMap.toMap.foreach { case (eid, _) =>
+        driverEndpoint.askWithRetry[Boolean](
+          RemoveExecutor(eid, SlaveLost("Stale executor after cluster manager re-registered.")))
+      }
+    }
+  }
+
   override def reviveOffers() {
     driverEndpoint.send(ReviveOffers)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/6900f017/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 116f027..fedfbd5 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -805,6 +805,90 @@ class ExecutorAllocationManagerSuite
     assert(maxNumExecutorsNeeded(manager) === 1)
   }
 
+  test("reset the state of allocation manager") {
+    sc = createSparkContext()
+    val manager = sc.executorAllocationManager.get
+    assert(numExecutorsTarget(manager) === 1)
+    assert(numExecutorsToAdd(manager) === 1)
+
+    // Allocation manager is reset when adding executor requests are sent without reporting back
+    // executor added.
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10)))
+
+    assert(addExecutors(manager) === 1)
+    assert(numExecutorsTarget(manager) === 2)
+    assert(addExecutors(manager) === 2)
+    assert(numExecutorsTarget(manager) === 4)
+    assert(addExecutors(manager) === 1)
+    assert(numExecutorsTarget(manager) === 5)
+
+    manager.reset()
+    assert(numExecutorsTarget(manager) === 1)
+    assert(numExecutorsToAdd(manager) === 1)
+    assert(executorIds(manager) === Set.empty)
+
+    // Allocation manager is reset when executors are added.
+    sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10)))
+
+    addExecutors(manager)
+    addExecutors(manager)
+    addExecutors(manager)
+    assert(numExecutorsTarget(manager) === 5)
+
+    onExecutorAdded(manager, "first")
+    onExecutorAdded(manager, "second")
+    onExecutorAdded(manager, "third")
+    onExecutorAdded(manager, "fourth")
+    onExecutorAdded(manager, "fifth")
+    assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
+
+    // Cluster manager lost will make all the live executors lost, so here simulate this behavior
+    onExecutorRemoved(manager, "first")
+    onExecutorRemoved(manager, "second")
+    onExecutorRemoved(manager, "third")
+    onExecutorRemoved(manager, "fourth")
+    onExecutorRemoved(manager, "fifth")
+
+    manager.reset()
+    assert(numExecutorsTarget(manager) === 1)
+    assert(numExecutorsToAdd(manager) === 1)
+    assert(executorIds(manager) === Set.empty)
+    assert(removeTimes(manager) === Map.empty)
+
+    // Allocation manager is reset when executors are pending to remove
+    addExecutors(manager)
+    addExecutors(manager)
+    addExecutors(manager)
+    assert(numExecutorsTarget(manager) === 5)
+
+    onExecutorAdded(manager, "first")
+    onExecutorAdded(manager, "second")
+    onExecutorAdded(manager, "third")
+    onExecutorAdded(manager, "fourth")
+    onExecutorAdded(manager, "fifth")
+    assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
+
+    removeExecutor(manager, "first")
+    removeExecutor(manager, "second")
+    assert(executorsPendingToRemove(manager) === Set("first", "second"))
+    assert(executorIds(manager) === Set("first", "second", "third", "fourth", "fifth"))
+
+
+    // Cluster manager lost will make all the live executors lost, so here simulate this behavior
+    onExecutorRemoved(manager, "first")
+    onExecutorRemoved(manager, "second")
+    onExecutorRemoved(manager, "third")
+    onExecutorRemoved(manager, "fourth")
+    onExecutorRemoved(manager, "fifth")
+
+    manager.reset()
+
+    assert(numExecutorsTarget(manager) === 1)
+    assert(numExecutorsToAdd(manager) === 1)
+    assert(executorsPendingToRemove(manager) === Set.empty)
+    assert(removeTimes(manager) === Map.empty)
+  }
+
   private def createSparkContext(
       minExecutors: Int = 1,
       maxExecutors: Int = 5,

http://git-wip-us.apache.org/repos/asf/spark/blob/6900f017/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index e3dd877..1431bce 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -60,6 +60,9 @@ private[spark] abstract class YarnSchedulerBackend(
   /** Scheduler extension services. */
   private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
 
+  // Flag to specify whether this schedulerBackend should be reset.
+  private var shouldResetOnAmRegister = false
+
   /**
    * Bind to YARN. This *must* be done before calling [[start()]].
    *
@@ -156,6 +159,16 @@ private[spark] abstract class YarnSchedulerBackend(
   }
 
   /**
+   * Reset the state of SchedulerBackend to the initial state. This is happened when AM is failed
+   * and re-registered itself to driver after a failure. The stale state in driver should be
+   * cleaned.
+   */
+  override protected def reset(): Unit = {
+    super.reset()
+    sc.executorAllocationManager.foreach(_.reset())
+  }
+
+  /**
    * Override the DriverEndpoint to add extra logic for the case when an executor is disconnected.
    * This endpoint communicates with the executors and queries the AM for an executor's exit
    * status when the executor is disconnected.
@@ -218,6 +231,8 @@ private[spark] abstract class YarnSchedulerBackend(
         case None =>
           logWarning("Attempted to check for an executor loss reason" +
             " before the AM has registered!")
+          driverEndpoint.askWithRetry[Boolean](
+            RemoveExecutor(executorId, SlaveLost("AM is not yet registered.")))
       }
     }
 
@@ -225,6 +240,13 @@ private[spark] abstract class YarnSchedulerBackend(
       case RegisterClusterManager(am) =>
         logInfo(s"ApplicationMaster registered as $am")
         amEndpoint = Option(am)
+        if (!shouldResetOnAmRegister) {
+          shouldResetOnAmRegister = true
+        } else {
+          // AM is already registered before, this potentially means that AM failed and
+          // a new one registered after the failure. This will only happen in yarn-client mode.
+          reset()
+        }
 
       case AddWebUIFilter(filterName, filterParams, proxyBase) =>
         addWebUIFilter(filterName, filterParams, proxyBase)
@@ -270,6 +292,7 @@ private[spark] abstract class YarnSchedulerBackend(
     override def onDisconnected(remoteAddress: RpcAddress): Unit = {
       if (amEndpoint.exists(_.address == remoteAddress)) {
         logWarning(s"ApplicationMaster has disassociated: $remoteAddress")
+        amEndpoint = None
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org