You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2015/07/21 18:35:55 UTC

spark git commit: [SPARK-9193] Avoid assigning tasks to "lost" executor(s)

Repository: spark
Updated Branches:
  refs/heads/master df4ddb312 -> 6592a6058


[SPARK-9193] Avoid assigning tasks to "lost" executor(s)

Now, when some executors are killed by dynamic-allocation, it leads to some mis-assignment onto lost executors sometimes. Such kind of mis-assignment causes task failure(s) or even job failure if it repeats that errors for 4 times.

The root cause is that ***killExecutors*** doesn't remove those executors under killing ASAP. It depends on the ***OnDisassociated*** event to refresh the active working list later. The delay time really depends on your cluster status (from several milliseconds to sub-minute). When new tasks to be scheduled during that period of time, it will be assigned to those "active" but "under killing" executors. Then the tasks will be failed due to "executor lost". The better way is to exclude those executors under killing in the makeOffers(). Then all those tasks won't be allocated onto those executors "to be lost" any more.

Author: Grace <ji...@intel.com>

Closes #7528 from GraceH/AssignToLostExecutor and squashes the following commits:

ecc1da6 [Grace] scala style fix
6e2ed96 [Grace] Re-word makeOffers by more readable lines
b5546ce [Grace] Add comments about the fix
30a9ad0 [Grace] Avoid assigning tasks to lost executors


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6592a605
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6592a605
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6592a605

Branch: refs/heads/master
Commit: 6592a6058eee6a27a5c91281ca19076284d62483
Parents: df4ddb3
Author: Grace <ji...@intel.com>
Authored: Tue Jul 21 11:35:49 2015 -0500
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Tue Jul 21 11:35:49 2015 -0500

----------------------------------------------------------------------
 .../cluster/CoarseGrainedSchedulerBackend.scala    | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6592a605/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f14c603..c65b3e5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -169,9 +169,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
     // Make fake resource offers on all executors
     private def makeOffers() {
-      launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
+      // Filter out executors under killing
+      val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))
+      val workOffers = activeExecutors.map { case (id, executorData) =>
         new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
-      }.toSeq))
+      }.toSeq
+      launchTasks(scheduler.resourceOffers(workOffers))
     }
 
     override def onDisconnected(remoteAddress: RpcAddress): Unit = {
@@ -181,9 +184,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
     // Make fake resource offers on just one executor
     private def makeOffers(executorId: String) {
-      val executorData = executorDataMap(executorId)
-      launchTasks(scheduler.resourceOffers(
-        Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))))
+      // Filter out executors under killing
+      if (!executorsPendingToRemove.contains(executorId)) {
+        val executorData = executorDataMap(executorId)
+        val workOffers = Seq(
+          new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
+        launchTasks(scheduler.resourceOffers(workOffers))
+      }
     }
 
     // Launch tasks returned by a set of resource offers


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org