You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2014/03/05 23:00:34 UTC

git commit: SPARK-1171: when executor is removed, we should minus totalCores instead of just freeCores on that executor

Repository: spark
Updated Branches:
  refs/heads/master 02836657c -> a3da50881


SPARK-1171: when executor is removed, we should minus totalCores instead of just freeCores on that executor

https://spark-project.atlassian.net/browse/SPARK-1171

When the executor is removed, the current implementation will only minus the freeCores of that executor. Actually we should minus the totalCores...

Author: CodingCat <zh...@gmail.com>
Author: Nan Zhu <Co...@users.noreply.github.com>

Closes #63 from CodingCat/simplify_CoarseGrainedSchedulerBackend and squashes the following commits:

f6bf93f [Nan Zhu] code clean
19c2bb4 [CodingCat] use copy idiom to reconstruct the workerOffers
43c13e9 [CodingCat] keep WorkerOffer immutable
af470d3 [CodingCat] style fix
0c0e409 [CodingCat] simplify the implementation of CoarseGrainedSchedulerBackend


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3da5088
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3da5088
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3da5088

Branch: refs/heads/master
Commit: a3da5088195eea7d90b37feee5dd2a372fcd9ace
Parents: 0283665
Author: CodingCat <zh...@gmail.com>
Authored: Wed Mar 5 14:00:28 2014 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Wed Mar 5 14:00:28 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/scheduler/WorkerOffer.scala  | 2 +-
 .../scheduler/cluster/CoarseGrainedSchedulerBackend.scala    | 8 ++++++--
 2 files changed, 7 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a3da5088/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
index ba6bab3..810b36c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
@@ -21,4 +21,4 @@ package org.apache.spark.scheduler
  * Represents free resources available on an executor.
  */
 private[spark]
-class WorkerOffer(val executorId: String, val host: String, val cores: Int)
+case class WorkerOffer(executorId: String, host: String, cores: Int)

http://git-wip-us.apache.org/repos/asf/spark/blob/a3da5088/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 379e02e..fad0373 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -54,6 +54,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     private val executorAddress = new HashMap[String, Address]
     private val executorHost = new HashMap[String, String]
     private val freeCores = new HashMap[String, Int]
+    private val totalCores = new HashMap[String, Int]
     private val addressToExecutorId = new HashMap[Address, String]
 
     override def preStart() {
@@ -76,6 +77,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
           sender ! RegisteredExecutor(sparkProperties)
           executorActor(executorId) = sender
           executorHost(executorId) = Utils.parseHostPort(hostPort)._1
+          totalCores(executorId) = cores
           freeCores(executorId) = cores
           executorAddress(executorId) = sender.path.address
           addressToExecutorId(sender.path.address) = executorId
@@ -147,10 +149,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     def removeExecutor(executorId: String, reason: String) {
       if (executorActor.contains(executorId)) {
         logInfo("Executor " + executorId + " disconnected, so removing it")
-        val numCores = freeCores(executorId)
-        addressToExecutorId -= executorAddress(executorId)
+        val numCores = totalCores(executorId)
         executorActor -= executorId
         executorHost -= executorId
+        addressToExecutorId -= executorAddress(executorId)
+        executorAddress -= executorId
+        totalCores -= executorId
         freeCores -= executorId
         totalCoreCount.addAndGet(-numCores)
         scheduler.executorLost(executorId, SlaveLost(reason))