You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/09/28 07:08:13 UTC

git commit: SPARK-CORE [SPARK-3651] Group common CoarseGrainedSchedulerBackend variables together

Repository: spark
Updated Branches:
  refs/heads/master 248232936 -> 9966d1a8a


SPARK-CORE [SPARK-3651] Group common CoarseGrainedSchedulerBackend variables together

from [SPARK-3651]
In CoarseGrainedSchedulerBackend, we have:

    private val executorActor = new HashMap[String, ActorRef]
    private val executorAddress = new HashMap[String, Address]
    private val executorHost = new HashMap[String, String]
    private val freeCores = new HashMap[String, Int]
    private val totalCores = new HashMap[String, Int]

We only ever put / remove stuff from these maps together. It would simplify the code if we consolidate these all into one map as we have done in JobProgressListener in https://issues.apache.org/jira/browse/SPARK-2299.

Author: Dale <ti...@outlook.com>

Closes #2533 from tigerquoll/SPARK-3651 and squashes the following commits:

d1be0a9 [Dale] [SPARK-3651]  implemented suggested changes. Changed a reference from executorInfo to executorData to be consistent with other usages
6890663 [Dale] [SPARK-3651]  implemented suggested changes
7d671cf [Dale] [SPARK-3651]  Grouped variables under a ExecutorDataObject, and reference them via a map entry as they are all retrieved under the same key


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9966d1a8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9966d1a8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9966d1a8

Branch: refs/heads/master
Commit: 9966d1a8aaed3d8cfed93855959705ea3c677215
Parents: 2482329
Author: Dale <ti...@outlook.com>
Authored: Sat Sep 27 22:08:10 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Sep 27 22:08:10 2014 -0700

----------------------------------------------------------------------
 .../cluster/CoarseGrainedSchedulerBackend.scala | 68 +++++++++-----------
 .../spark/scheduler/cluster/ExecutorData.scala  | 38 +++++++++++
 2 files changed, 68 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9966d1a8/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 9a0cb1c..59e15ed 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -62,15 +62,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
   val createTime = System.currentTimeMillis()
 
   class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with ActorLogReceive {
-
     override protected def log = CoarseGrainedSchedulerBackend.this.log
-
-    private val executorActor = new HashMap[String, ActorRef]
-    private val executorAddress = new HashMap[String, Address]
-    private val executorHost = new HashMap[String, String]
-    private val freeCores = new HashMap[String, Int]
-    private val totalCores = new HashMap[String, Int]
     private val addressToExecutorId = new HashMap[Address, String]
+    private val executorDataMap = new HashMap[String, ExecutorData]
 
     override def preStart() {
       // Listen for remote client disconnection events, since they don't go through Akka's watch()
@@ -85,16 +79,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     def receiveWithLogging = {
       case RegisterExecutor(executorId, hostPort, cores) =>
         Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
-        if (executorActor.contains(executorId)) {
+        if (executorDataMap.contains(executorId)) {
           sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
         } else {
           logInfo("Registered executor: " + sender + " with ID " + executorId)
           sender ! RegisteredExecutor
-          executorActor(executorId) = sender
-          executorHost(executorId) = Utils.parseHostPort(hostPort)._1
-          totalCores(executorId) = cores
-          freeCores(executorId) = cores
-          executorAddress(executorId) = sender.path.address
+          executorDataMap.put(executorId, new ExecutorData(sender, sender.path.address,
+            Utils.parseHostPort(hostPort)._1, cores, cores))
+
           addressToExecutorId(sender.path.address) = executorId
           totalCoreCount.addAndGet(cores)
           totalRegisteredExecutors.addAndGet(1)
@@ -104,13 +96,14 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
       case StatusUpdate(executorId, taskId, state, data) =>
         scheduler.statusUpdate(taskId, state, data.value)
         if (TaskState.isFinished(state)) {
-          if (executorActor.contains(executorId)) {
-            freeCores(executorId) += scheduler.CPUS_PER_TASK
-            makeOffers(executorId)
-          } else {
-            // Ignoring the update since we don't know about the executor.
-            val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
-            logWarning(msg.format(taskId, state, sender, executorId))
+          executorDataMap.get(executorId) match {
+            case Some(executorInfo) =>
+              executorInfo.freeCores += scheduler.CPUS_PER_TASK
+              makeOffers(executorId)
+            case None =>
+              // Ignoring the update since we don't know about the executor.
+              logWarning(s"Ignored task status update ($taskId state $state) " +
+                "from unknown executor $sender with ID $executorId")
           }
         }
 
@@ -118,7 +111,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
         makeOffers()
 
       case KillTask(taskId, executorId, interruptThread) =>
-        executorActor(executorId) ! KillTask(taskId, executorId, interruptThread)
+        executorDataMap(executorId).executorActor ! KillTask(taskId, executorId, interruptThread)
 
       case StopDriver =>
         sender ! true
@@ -126,8 +119,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
 
       case StopExecutors =>
         logInfo("Asking each executor to shut down")
-        for (executor <- executorActor.values) {
-          executor ! StopExecutor
+        for ((_, executorData) <- executorDataMap) {
+          executorData.executorActor ! StopExecutor
         }
         sender ! true
 
@@ -138,6 +131,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
       case AddWebUIFilter(filterName, filterParams, proxyBase) =>
         addWebUIFilter(filterName, filterParams, proxyBase)
         sender ! true
+
       case DisassociatedEvent(_, address, _) =>
         addressToExecutorId.get(address).foreach(removeExecutor(_,
           "remote Akka client disassociated"))
@@ -149,13 +143,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
     // Make fake resource offers on all executors
     def makeOffers() {
       launchTasks(scheduler.resourceOffers(
-        executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
+        executorDataMap.map {case (id, executorData) =>
+          new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toSeq))
     }
 
     // Make fake resource offers on just one executor
     def makeOffers(executorId: String) {
+      val executorData = executorDataMap(executorId)
       launchTasks(scheduler.resourceOffers(
-        Seq(new WorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
+        Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))))
     }
 
     // Launch tasks returned by a set of resource offers
@@ -179,25 +175,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
           }
         }
         else {
-          freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
-          executorActor(task.executorId) ! LaunchTask(new SerializableBuffer(serializedTask))
+          val executorData = executorDataMap(task.executorId)
+          executorData.freeCores -= scheduler.CPUS_PER_TASK
+          executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
         }
       }
     }
 
     // Remove a disconnected slave from the cluster
     def removeExecutor(executorId: String, reason: String) {
-      if (executorActor.contains(executorId)) {
-        logInfo("Executor " + executorId + " disconnected, so removing it")
-        val numCores = totalCores(executorId)
-        executorActor -= executorId
-        executorHost -= executorId
-        addressToExecutorId -= executorAddress(executorId)
-        executorAddress -= executorId
-        totalCores -= executorId
-        freeCores -= executorId
-        totalCoreCount.addAndGet(-numCores)
-        scheduler.executorLost(executorId, SlaveLost(reason))
+      executorDataMap.get(executorId) match {
+        case Some(executorInfo) =>
+          executorDataMap -= executorId
+          totalCoreCount.addAndGet(-executorInfo.totalCores)
+          scheduler.executorLost(executorId, SlaveLost(reason))
+        case None => logError(s"Asked to remove non existant executor $executorId")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9966d1a8/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
new file mode 100644
index 0000000..74a9298
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import akka.actor.{Address, ActorRef}
+
+/**
+ * Grouping of data that is accessed by a CourseGrainedScheduler. This class
+ * is stored in a Map keyed by an executorID
+ *
+ * @param executorActor The actorRef representing this executor
+ * @param executorAddress The network address of this executor
+ * @param executorHost The hostname that this executor is running on
+ * @param freeCores  The current number of cores available for work on the executor
+ * @param totalCores The total number of cores available to the executor
+ */
+private[cluster] class ExecutorData(
+   val executorActor: ActorRef,
+   val executorAddress: Address,
+   val executorHost: String ,
+   var freeCores: Int,
+   val totalCores: Int
+)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org