You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/09/18 01:10:04 UTC

[2/2] git commit: Minor cleanups to the Master code.

Minor cleanups to the Master code.

Review: https://reviews.apache.org/r/25566


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0760b007
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0760b007
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0760b007

Branch: refs/heads/master
Commit: 0760b007ad65bc91e8cea377339978c78d36d247
Parents: b621191
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Sep 11 10:48:20 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Sep 17 15:46:38 2014 -0700

----------------------------------------------------------------------
 src/master/http.cpp   |  2 +-
 src/master/master.cpp | 53 +++++++++++++++++++++++++++-------------------
 src/master/master.hpp | 36 +++++++++++++++++--------------
 3 files changed, 52 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0760b007/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 6dd11fe..8db4d9a 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -405,7 +405,7 @@ Future<Response> Master::Http::stats(const Request& request)
         totalResources += resource;
       }
     }
-    foreach (const Resource& resource, slave->resourcesInUse) {
+    foreach (const Resource& resource, slave->used()) {
       if (resource.type() == Value::SCALAR) {
         usedResources += resource;
       }

http://git-wip-us.apache.org/repos/asf/mesos/blob/0760b007/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 11d75fb..52a7409 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3763,10 +3763,10 @@ void Master::reconcile(
   // missing from the slave. This could happen if the task was
   // dropped by the slave (e.g., slave exited before getting the
   // task or the task was launched while slave was in recovery).
-  // NOTE: keys() and values() are used since statusUpdate()
-  //       modifies slave->tasks.
-  foreach (const FrameworkID& frameworkId, slave->tasks.keys()) {
-    foreach (Task* task, slave->tasks[frameworkId].values()) {
+  // NOTE: copies are used because statusUpdate() modifies
+  //       slave->tasks.
+  foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
+    foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
       if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
         LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id()
                      << " of framework " << task->framework_id()
@@ -4010,10 +4010,11 @@ void Master::removeFramework(Framework* framework)
 
   // Remove the framework's offers (if they weren't removed before).
   foreach (Offer* offer, utils::copy(framework->offers)) {
-    allocator->resourcesRecovered(offer->framework_id(),
-                                  offer->slave_id(),
-                                  Resources(offer->resources()),
-                                  None());
+    allocator->resourcesRecovered(
+        offer->framework_id(),
+        offer->slave_id(),
+        offer->resources(),
+        None());
     removeOffer(offer);
   }
 
@@ -4078,9 +4079,9 @@ void Master::removeFramework(Slave* slave, Framework* framework)
 
   // Remove pointers to framework's tasks in slaves, and send status
   // updates.
-  // NOTE: values() is used because statusUpdate() modifies
+  // NOTE: A copy is used because statusUpdate() modifies
   //       slave->tasks.
-  foreach (Task* task, slave->tasks[framework->id].values()) {
+  foreachvalue (Task* task, utils::copy(slave->tasks[framework->id])) {
     // Remove tasks that belong to this framework.
     if (task->framework_id() == framework->id) {
       // A framework might not actually exist because the master failed
@@ -4257,8 +4258,8 @@ void Master::removeSlave(Slave* slave)
   // updates. Rather, build up the updates so that we can send them
   // after the slave is removed from the registry.
   vector<StatusUpdate> updates;
-  foreach (const FrameworkID& frameworkId, slave->tasks.keys()) {
-    foreach (Task* task, slave->tasks[frameworkId].values()) {
+  foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
+    foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
       const StatusUpdate& update = protobuf::createStatusUpdate(
           task->framework_id(),
           task->slave_id(),
@@ -4372,6 +4373,21 @@ void Master::removeTask(Task* task)
 {
   CHECK_NOTNULL(task);
 
+  Slave* slave = CHECK_NOTNULL(getSlave(task->slave_id()));
+
+  if (!protobuf::isTerminalState(task->state())) {
+    LOG(WARNING) << "Removing task " << task->task_id()
+                 << " with resources " << task->resources()
+                 << " of framework " << task->framework_id()
+                 << " on slave " << *slave
+                 << " in non-terminal state " << task->state();
+  } else {
+    LOG(INFO) << "Removing task " << task->task_id()
+              << " with resources " << task->resources()
+              << " of framework " << task->framework_id()
+              << " on slave " << *slave;
+  }
+
   // Remove from framework.
   Framework* framework = getFramework(task->framework_id());
   if (framework != NULL) { // A framework might not be re-connected yet.
@@ -4379,15 +4395,13 @@ void Master::removeTask(Task* task)
   }
 
   // Remove from slave.
-  Slave* slave = getSlave(task->slave_id());
-  CHECK_NOTNULL(slave);
   slave->removeTask(task);
 
   // Tell the allocator about the recovered resources.
   allocator->resourcesRecovered(
       task->framework_id(),
       task->slave_id(),
-      Resources(task->resources()),
+      task->resources(),
       None());
 
   // Update the task state metric.
@@ -4396,12 +4410,7 @@ void Master::removeTask(Task* task)
     case TASK_FAILED:   ++metrics.tasks_failed;   break;
     case TASK_KILLED:   ++metrics.tasks_killed;   break;
     case TASK_LOST:     ++metrics.tasks_lost;     break;
-    default:
-      LOG(WARNING) << "Removing task " << task->task_id()
-                   << " of framework " << task->framework_id()
-                   << " and slave " << task->slave_id()
-                   << " in non-terminal state " << task->state();
-      break;
+    default: break;
   }
 
   delete task;
@@ -4934,7 +4943,7 @@ double Master::_resources_used(const std::string& name)
   double used = 0.0;
 
   foreachvalue (Slave* slave, slaves.registered) {
-    foreach (const Resource& resource, slave->resourcesInUse) {
+    foreach (const Resource& resource, slave->used()) {
       if (resource.name() == name && resource.type() == Value::SCALAR) {
         used += resource.scalar().value();
       }

http://git-wip-us.apache.org/repos/asf/mesos/blob/0760b007/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 54e3918..80d7535 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -839,7 +839,6 @@ struct Slave
     LOG(INFO) << "Adding task " << task->task_id()
               << " with resources " << task->resources()
               << " on slave " << id << " (" << info.hostname() << ")";
-    resourcesInUse += task->resources();
   }
 
   void removeTask(Task* task)
@@ -854,10 +853,6 @@ struct Slave
     }
 
     killedTasks.remove(task->framework_id(), task->task_id());
-    LOG(INFO) << "Removing task " << task->task_id()
-              << " with resources " << task->resources()
-              << " on slave " << id << " (" << info.hostname() << ")";
-    resourcesInUse -= task->resources();
   }
 
   void addOffer(Offer* offer)
@@ -867,7 +862,6 @@ struct Slave
     VLOG(1) << "Adding offer " << offer->id()
             << " with resources " << offer->resources()
             << " on slave " << id << " (" << info.hostname() << ")";
-    resourcesOffered += offer->resources();
   }
 
   void removeOffer(Offer* offer)
@@ -877,7 +871,6 @@ struct Slave
     VLOG(1) << "Removing offer " << offer->id()
             << " with resources " << offer->resources()
             << " on slave " << id << " (" << info.hostname() << ")";
-    resourcesOffered -= offer->resources();
   }
 
   bool hasExecutor(const FrameworkID& frameworkId,
@@ -895,9 +888,6 @@ struct Slave
       << " of framework " << frameworkId;
 
     executors[frameworkId][executorInfo.executor_id()] = executorInfo;
-
-    // Update the resources in use to reflect running this executor.
-    resourcesInUse += executorInfo.resources();
   }
 
   void removeExecutor(const FrameworkID& frameworkId,
@@ -906,15 +896,32 @@ struct Slave
     CHECK(hasExecutor(frameworkId, executorId))
       << "Unknown executor " << executorId << " of framework " << frameworkId;
 
-    // Update the resources in use to reflect removing this executor.
-    resourcesInUse -= executors[frameworkId][executorId].resources();
-
     executors[frameworkId].erase(executorId);
     if (executors[frameworkId].empty()) {
       executors.erase(frameworkId);
     }
   }
 
+  Resources used() const
+  {
+    Resources used;
+
+    foreachkey (const FrameworkID& frameworkId, tasks) {
+      foreachvalue (const Task* task, tasks.find(frameworkId)->second) {
+        used += task->resources();
+      }
+    }
+
+    foreachkey (const FrameworkID& frameworkId, executors) {
+      foreachvalue (const ExecutorInfo& executorInfo,
+                    executors.find(frameworkId)->second) {
+        used += executorInfo.resources();
+      }
+    }
+
+    return used;
+  }
+
   const SlaveID id;
   const SlaveInfo info;
 
@@ -927,9 +934,6 @@ struct Slave
   // enabled because we expect it reregister after recovery.
   bool disconnected;
 
-  Resources resourcesOffered; // Resources offered.
-  Resources resourcesInUse;   // Resources used by tasks and executors.
-
   // Executors running on this slave.
   hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> > executors;