You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/09/18 01:10:04 UTC
[2/2] git commit: Minor cleanups to the Master code.
Minor cleanups to the Master code.
Review: https://reviews.apache.org/r/25566
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0760b007
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0760b007
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0760b007
Branch: refs/heads/master
Commit: 0760b007ad65bc91e8cea377339978c78d36d247
Parents: b621191
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Sep 11 10:48:20 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Sep 17 15:46:38 2014 -0700
----------------------------------------------------------------------
src/master/http.cpp | 2 +-
src/master/master.cpp | 53 +++++++++++++++++++++++++++-------------------
src/master/master.hpp | 36 +++++++++++++++++--------------
3 files changed, 52 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0760b007/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 6dd11fe..8db4d9a 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -405,7 +405,7 @@ Future<Response> Master::Http::stats(const Request& request)
totalResources += resource;
}
}
- foreach (const Resource& resource, slave->resourcesInUse) {
+ foreach (const Resource& resource, slave->used()) {
if (resource.type() == Value::SCALAR) {
usedResources += resource;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/0760b007/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 11d75fb..52a7409 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3763,10 +3763,10 @@ void Master::reconcile(
// missing from the slave. This could happen if the task was
// dropped by the slave (e.g., slave exited before getting the
// task or the task was launched while slave was in recovery).
- // NOTE: keys() and values() are used since statusUpdate()
- // modifies slave->tasks.
- foreach (const FrameworkID& frameworkId, slave->tasks.keys()) {
- foreach (Task* task, slave->tasks[frameworkId].values()) {
+ // NOTE: copies are used because statusUpdate() modifies
+ // slave->tasks.
+ foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
+ foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id()
<< " of framework " << task->framework_id()
@@ -4010,10 +4010,11 @@ void Master::removeFramework(Framework* framework)
// Remove the framework's offers (if they weren't removed before).
foreach (Offer* offer, utils::copy(framework->offers)) {
- allocator->resourcesRecovered(offer->framework_id(),
- offer->slave_id(),
- Resources(offer->resources()),
- None());
+ allocator->resourcesRecovered(
+ offer->framework_id(),
+ offer->slave_id(),
+ offer->resources(),
+ None());
removeOffer(offer);
}
@@ -4078,9 +4079,9 @@ void Master::removeFramework(Slave* slave, Framework* framework)
// Remove pointers to framework's tasks in slaves, and send status
// updates.
- // NOTE: values() is used because statusUpdate() modifies
+ // NOTE: A copy is used because statusUpdate() modifies
// slave->tasks.
- foreach (Task* task, slave->tasks[framework->id].values()) {
+ foreachvalue (Task* task, utils::copy(slave->tasks[framework->id])) {
// Remove tasks that belong to this framework.
if (task->framework_id() == framework->id) {
// A framework might not actually exist because the master failed
@@ -4257,8 +4258,8 @@ void Master::removeSlave(Slave* slave)
// updates. Rather, build up the updates so that we can send them
// after the slave is removed from the registry.
vector<StatusUpdate> updates;
- foreach (const FrameworkID& frameworkId, slave->tasks.keys()) {
- foreach (Task* task, slave->tasks[frameworkId].values()) {
+ foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
+ foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
const StatusUpdate& update = protobuf::createStatusUpdate(
task->framework_id(),
task->slave_id(),
@@ -4372,6 +4373,21 @@ void Master::removeTask(Task* task)
{
CHECK_NOTNULL(task);
+ Slave* slave = CHECK_NOTNULL(getSlave(task->slave_id()));
+
+ if (!protobuf::isTerminalState(task->state())) {
+ LOG(WARNING) << "Removing task " << task->task_id()
+ << " with resources " << task->resources()
+ << " of framework " << task->framework_id()
+ << " on slave " << *slave
+ << " in non-terminal state " << task->state();
+ } else {
+ LOG(INFO) << "Removing task " << task->task_id()
+ << " with resources " << task->resources()
+ << " of framework " << task->framework_id()
+ << " on slave " << *slave;
+ }
+
// Remove from framework.
Framework* framework = getFramework(task->framework_id());
if (framework != NULL) { // A framework might not be re-connected yet.
@@ -4379,15 +4395,13 @@ void Master::removeTask(Task* task)
}
// Remove from slave.
- Slave* slave = getSlave(task->slave_id());
- CHECK_NOTNULL(slave);
slave->removeTask(task);
// Tell the allocator about the recovered resources.
allocator->resourcesRecovered(
task->framework_id(),
task->slave_id(),
- Resources(task->resources()),
+ task->resources(),
None());
// Update the task state metric.
@@ -4396,12 +4410,7 @@ void Master::removeTask(Task* task)
case TASK_FAILED: ++metrics.tasks_failed; break;
case TASK_KILLED: ++metrics.tasks_killed; break;
case TASK_LOST: ++metrics.tasks_lost; break;
- default:
- LOG(WARNING) << "Removing task " << task->task_id()
- << " of framework " << task->framework_id()
- << " and slave " << task->slave_id()
- << " in non-terminal state " << task->state();
- break;
+ default: break;
}
delete task;
@@ -4934,7 +4943,7 @@ double Master::_resources_used(const std::string& name)
double used = 0.0;
foreachvalue (Slave* slave, slaves.registered) {
- foreach (const Resource& resource, slave->resourcesInUse) {
+ foreach (const Resource& resource, slave->used()) {
if (resource.name() == name && resource.type() == Value::SCALAR) {
used += resource.scalar().value();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/0760b007/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 54e3918..80d7535 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -839,7 +839,6 @@ struct Slave
LOG(INFO) << "Adding task " << task->task_id()
<< " with resources " << task->resources()
<< " on slave " << id << " (" << info.hostname() << ")";
- resourcesInUse += task->resources();
}
void removeTask(Task* task)
@@ -854,10 +853,6 @@ struct Slave
}
killedTasks.remove(task->framework_id(), task->task_id());
- LOG(INFO) << "Removing task " << task->task_id()
- << " with resources " << task->resources()
- << " on slave " << id << " (" << info.hostname() << ")";
- resourcesInUse -= task->resources();
}
void addOffer(Offer* offer)
@@ -867,7 +862,6 @@ struct Slave
VLOG(1) << "Adding offer " << offer->id()
<< " with resources " << offer->resources()
<< " on slave " << id << " (" << info.hostname() << ")";
- resourcesOffered += offer->resources();
}
void removeOffer(Offer* offer)
@@ -877,7 +871,6 @@ struct Slave
VLOG(1) << "Removing offer " << offer->id()
<< " with resources " << offer->resources()
<< " on slave " << id << " (" << info.hostname() << ")";
- resourcesOffered -= offer->resources();
}
bool hasExecutor(const FrameworkID& frameworkId,
@@ -895,9 +888,6 @@ struct Slave
<< " of framework " << frameworkId;
executors[frameworkId][executorInfo.executor_id()] = executorInfo;
-
- // Update the resources in use to reflect running this executor.
- resourcesInUse += executorInfo.resources();
}
void removeExecutor(const FrameworkID& frameworkId,
@@ -906,15 +896,32 @@ struct Slave
CHECK(hasExecutor(frameworkId, executorId))
<< "Unknown executor " << executorId << " of framework " << frameworkId;
- // Update the resources in use to reflect removing this executor.
- resourcesInUse -= executors[frameworkId][executorId].resources();
-
executors[frameworkId].erase(executorId);
if (executors[frameworkId].empty()) {
executors.erase(frameworkId);
}
}
+ Resources used() const
+ {
+ Resources used;
+
+ foreachkey (const FrameworkID& frameworkId, tasks) {
+ foreachvalue (const Task* task, tasks.find(frameworkId)->second) {
+ used += task->resources();
+ }
+ }
+
+ foreachkey (const FrameworkID& frameworkId, executors) {
+ foreachvalue (const ExecutorInfo& executorInfo,
+ executors.find(frameworkId)->second) {
+ used += executorInfo.resources();
+ }
+ }
+
+ return used;
+ }
+
const SlaveID id;
const SlaveInfo info;
@@ -927,9 +934,6 @@ struct Slave
// enabled because we expect it reregister after recovery.
bool disconnected;
- Resources resourcesOffered; // Resources offered.
- Resources resourcesInUse; // Resources used by tasks and executors.
-
// Executors running on this slave.
hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> > executors;