You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/09/18 01:10:03 UTC
[1/2] git commit: Added a removeExecutor helper in the Master.
Repository: mesos
Updated Branches:
refs/heads/master 82e886c91 -> 0760b007a
Added a removeExecutor helper in the Master.
Review: https://reviews.apache.org/r/25565
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b6211913
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b6211913
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b6211913
Branch: refs/heads/master
Commit: b62119134ca7c5020341ed2f3a991ebfaf8355ef
Parents: 82e886c
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Wed Sep 10 14:34:30 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Sep 17 15:46:37 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 213 ++++++++++++++++++++-------------------------
src/master/master.hpp | 40 +++++----
2 files changed, 121 insertions(+), 132 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b6211913/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 41dcc46..11d75fb 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -649,50 +649,29 @@ void Master::finalize()
{
LOG(INFO) << "Master terminating";
- // Remove the frameworks.
- // Note we are not deleting the pointers to the frameworks from the
- // allocator or the roles because it is unnecessary bookkeeping at
- // this point since we are shutting down.
- foreachvalue (Framework* framework, frameworks.registered) {
- // Remove pending tasks from the framework.
- framework->pendingTasks.clear();
-
- // Remove pointers to the framework's tasks in slaves.
- foreachvalue (Task* task, utils::copy(framework->tasks)) {
- Slave* slave = getSlave(task->slave_id());
- // Since we only find out about tasks when the slave re-registers,
- // it must be the case that the slave exists!
- CHECK(slave != NULL)
- << "Unknown slave " << task->slave_id()
- << " in the task " << task->task_id();
-
- removeTask(task);
+ // Remove the slaves.
+ foreachvalue (Slave* slave, slaves.registered) {
+ // Remove tasks.
+ foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
+ foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
+ removeTask(task);
+ }
}
- // Remove the framework's offers (if they weren't removed before).
- foreach (Offer* offer, utils::copy(framework->offers)) {
- removeOffer(offer);
+ // Remove executors.
+ foreachkey (const FrameworkID& frameworkId, utils::copy(slave->executors)) {
+ foreachkey (const ExecutorID& executorId,
+ utils::copy(slave->executors[frameworkId])) {
+ removeExecutor(slave, frameworkId, executorId);
+ }
}
- delete framework;
- }
- frameworks.registered.clear();
-
- CHECK_EQ(offers.size(), 0UL);
-
- foreachvalue (Slave* slave, slaves.registered) {
- // Remove tasks that are in the slave but not in any framework.
- // This could happen when the framework has yet to re-register
- // after master failover.
- // NOTE: keys() and values() are used because slave->tasks is
- // modified by removeTask()!
- foreach (const FrameworkID& frameworkId, slave->tasks.keys()) {
- foreach (Task* task, slave->tasks[frameworkId].values()) {
- removeTask(task);
- }
+ // Remove offers.
+ foreach (Offer* offer, utils::copy(slave->offers)) {
+ removeOffer(offer);
}
- // Kill the slave observer.
+ // Terminate the slave observer.
terminate(slave->observer);
wait(slave->observer);
@@ -701,6 +680,26 @@ void Master::finalize()
}
slaves.registered.clear();
+ // Remove the frameworks.
+ // Note we are not deleting the pointers to the frameworks from the
+ // allocator or the roles because it is unnecessary bookkeeping at
+ // this point since we are shutting down.
+ foreachvalue (Framework* framework, frameworks.registered) {
+ // Remove pending tasks from the framework.
+ framework->pendingTasks.clear();
+
+ // No tasks/executors/offers should remain since the slaves
+ // have been removed.
+ CHECK(framework->tasks.empty());
+ CHECK(framework->executors.empty());
+ CHECK(framework->offers.empty());
+
+ delete framework;
+ }
+ frameworks.registered.clear();
+
+ CHECK(offers.empty());
+
foreachvalue (Future<Nothing> future, authenticating) {
// NOTE: This is necessary during tests because a copy of
// this future is used to setup authentication timeout. If a
@@ -3282,33 +3281,22 @@ void Master::exitedExecutor(
Slave* slave = CHECK_NOTNULL(slaves.registered[slaveId]);
- // Tell the allocator about the recovered resources.
- if (slave->hasExecutor(frameworkId, executorId)) {
- ExecutorInfo executor = slave->executors[frameworkId][executorId];
-
- LOG(INFO) << "Executor " << executorId
- << " of framework " << frameworkId
- << " on slave " << *slave << " "
- << WSTRINGIFY(status);
-
- allocator->resourcesRecovered(
- frameworkId, slaveId, Resources(executor.resources()), None());
-
- // Remove executor from slave and framework.
- slave->removeExecutor(frameworkId, executorId);
- } else {
- LOG(WARNING) << "Ignoring unknown exited executor "
- << executorId << " on slave " << *slave;
+ if (!slave->hasExecutor(frameworkId, executorId)) {
+ LOG(WARNING) << "Ignoring unknown exited executor '" << executorId
+ << "' of framework " << frameworkId
+ << " on slave " << *slave;
+ return;
}
- Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- framework->removeExecutor(slave->id, executorId);
+ LOG(INFO) << "Executor " << executorId
+ << " of framework " << frameworkId
+ << " on slave " << *slave << " "
+ << WSTRINGIFY(status);
- // TODO(benh): Send the framework its executor's exit status?
- // Or maybe at least have something like
- // Scheduler::executorLost?
- }
+ removeExecutor(slave, frameworkId, executorId);
+
+ // TODO(benh): Send the framework its executor's exit status?
+ // Or maybe at least have something like Scheduler::executorLost?
}
@@ -3820,26 +3808,14 @@ void Master::reconcile(
foreachkey (const ExecutorID& executorId,
utils::copy(slave->executors[frameworkId])) {
if (!slaveExecutors.contains(frameworkId, executorId)) {
- LOG(WARNING) << "Removing executor " << executorId << " of framework "
- << frameworkId << " as it is unknown to the slave "
- << *slave;
-
- // TODO(bmahler): This is duplicated in several locations, we
- // may benefit from a method for removing an executor from
- // all the relevant data structures and the allocator, akin
- // to removeTask().
- allocator->resourcesRecovered(
- frameworkId,
- slave->id,
- slave->executors[frameworkId][executorId].resources(),
- None());
-
- slave->removeExecutor(frameworkId, executorId);
-
- if (frameworks.registered.contains(frameworkId)) {
- frameworks.registered[frameworkId]->removeExecutor(
- slave->id, executorId);
- }
+ // TODO(bmahler): Reconcile executors correctly between the
+ // master and the slave, see:
+ // MESOS-1466, MESOS-1800, and MESOS-1720.
+ LOG(WARNING) << "Executor " << executorId
+ << " of framework " << frameworkId
+ << " possibly unknown to the slave " << *slave;
+
+ removeExecutor(slave, frameworkId, executorId);
}
}
}
@@ -4045,15 +4021,9 @@ void Master::removeFramework(Framework* framework)
foreachkey (const SlaveID& slaveId, framework->executors) {
Slave* slave = getSlave(slaveId);
if (slave != NULL) {
- foreachpair (const ExecutorID& executorId,
- const ExecutorInfo& executorInfo,
- framework->executors[slaveId]) {
- allocator->resourcesRecovered(
- framework->id,
- slave->id,
- executorInfo.resources(),
- None());
- slave->removeExecutor(framework->id, executorId);
+ foreachkey (const ExecutorID& executorId,
+ utils::copy(framework->executors[slaveId])) {
+ removeExecutor(slave, framework->id, executorId);
}
}
}
@@ -4134,14 +4104,7 @@ void Master::removeFramework(Slave* slave, Framework* framework)
if (slave->executors.contains(framework->id)) {
foreachkey (const ExecutorID& executorId,
utils::copy(slave->executors[framework->id])) {
- allocator->resourcesRecovered(
- framework->id,
- slave->id,
- slave->executors[framework->id][executorId].resources(),
- None());
-
- framework->removeExecutor(slave->id, executorId);
- slave->removeExecutor(framework->id, executorId);
+ removeExecutor(slave, framework->id, executorId);
}
}
}
@@ -4313,6 +4276,14 @@ void Master::removeSlave(Slave* slave)
}
}
+ // Remove executors from the slave for proper resource accounting.
+ foreachkey (const FrameworkID& frameworkId, utils::copy(slave->executors)) {
+ foreachkey (const ExecutorID& executorId,
+ utils::copy(slave->executors[frameworkId])) {
+ removeExecutor(slave, frameworkId, executorId);
+ }
+ }
+
foreach (Offer* offer, utils::copy(slave->offers)) {
// TODO(vinod): We don't need to call 'Allocator::resourcesRecovered'
// once MESOS-621 is fixed.
@@ -4323,24 +4294,6 @@ void Master::removeSlave(Slave* slave)
removeOffer(offer, true); // Rescind!
}
- // Remove executors from the slave for proper resource accounting.
- foreachkey (const FrameworkID& frameworkId, slave->executors) {
- Framework* framework = getFramework(frameworkId);
- if (framework != NULL) {
- foreachkey (const ExecutorID& executorId, slave->executors[frameworkId]) {
- // TODO(vinod): We don't need to call 'Allocator::resourcesRecovered'
- // once MESOS-621 is fixed.
- allocator->resourcesRecovered(
- frameworkId,
- slave->id,
- slave->executors[frameworkId][executorId].resources(),
- None());
-
- framework->removeExecutor(slave->id, executorId);
- }
- }
- }
-
// Mark the slave as being removed.
slaves.removing.insert(slave->id);
slaves.registered.erase(slave->id);
@@ -4455,6 +4408,32 @@ void Master::removeTask(Task* task)
}
+void Master::removeExecutor(
+ Slave* slave,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
+{
+ CHECK_NOTNULL(slave);
+ CHECK(slave->hasExecutor(frameworkId, executorId));
+
+ ExecutorInfo executor = slave->executors[frameworkId][executorId];
+
+ LOG(INFO) << "Removing executor '" << executorId
+ << "' with resources " << executor.resources()
+ << " of framework " << frameworkId << " on slave " << *slave;
+
+ allocator->resourcesRecovered(
+ frameworkId, slave->id, executor.resources(), None());
+
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) { // The framework might not be re-registered yet.
+ framework->removeExecutor(slave->id, executorId);
+ }
+
+ slave->removeExecutor(frameworkId, executorId);
+}
+
+
void Master::offerTimeout(const OfferID& offerId)
{
Offer* offer = getOffer(offerId);
http://git-wip-us.apache.org/repos/asf/mesos/blob/b6211913/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index b492600..54e3918 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -368,9 +368,15 @@ protected:
const Filters& filters,
const process::Future<std::list<process::Future<Option<Error> > > >& f);
- // Remove a task.
+ // Remove a task and recover its resources.
void removeTask(Task* task);
+ // Remove an executor and recover its resources.
+ void removeExecutor(
+ Slave* slave,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId);
+
// Forwards the update to the framework.
void forward(
const StatusUpdate& update,
@@ -897,14 +903,15 @@ struct Slave
void removeExecutor(const FrameworkID& frameworkId,
const ExecutorID& executorId)
{
- if (hasExecutor(frameworkId, executorId)) {
- // Update the resources in use to reflect removing this executor.
- resourcesInUse -= executors[frameworkId][executorId].resources();
+ CHECK(hasExecutor(frameworkId, executorId))
+ << "Unknown executor " << executorId << " of framework " << frameworkId;
- executors[frameworkId].erase(executorId);
- if (executors[frameworkId].size() == 0) {
- executors.erase(frameworkId);
- }
+ // Update the resources in use to reflect removing this executor.
+ resourcesInUse -= executors[frameworkId][executorId].resources();
+
+ executors[frameworkId].erase(executorId);
+ if (executors[frameworkId].empty()) {
+ executors.erase(frameworkId);
}
}
@@ -1047,14 +1054,17 @@ struct Framework
void removeExecutor(const SlaveID& slaveId,
const ExecutorID& executorId)
{
- if (hasExecutor(slaveId, executorId)) {
- // Update our resources to reflect removing this executor.
- resources -= executors[slaveId][executorId].resources();
+ CHECK(hasExecutor(slaveId, executorId))
+ << "Unknown executor " << executorId
+ << " of framework " << id
+ << " of slave " << slaveId;
- executors[slaveId].erase(executorId);
- if (executors[slaveId].size() == 0) {
- executors.erase(slaveId);
- }
+ // Update our resources to reflect removing this executor.
+ resources -= executors[slaveId][executorId].resources();
+
+ executors[slaveId].erase(executorId);
+ if (executors[slaveId].empty()) {
+ executors.erase(slaveId);
}
}
[2/2] git commit: Minor cleanups to the Master code.
Posted by bm...@apache.org.
Minor cleanups to the Master code.
Review: https://reviews.apache.org/r/25566
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0760b007
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0760b007
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0760b007
Branch: refs/heads/master
Commit: 0760b007ad65bc91e8cea377339978c78d36d247
Parents: b621191
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Sep 11 10:48:20 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Sep 17 15:46:38 2014 -0700
----------------------------------------------------------------------
src/master/http.cpp | 2 +-
src/master/master.cpp | 53 +++++++++++++++++++++++++++-------------------
src/master/master.hpp | 36 +++++++++++++++++--------------
3 files changed, 52 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0760b007/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index 6dd11fe..8db4d9a 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -405,7 +405,7 @@ Future<Response> Master::Http::stats(const Request& request)
totalResources += resource;
}
}
- foreach (const Resource& resource, slave->resourcesInUse) {
+ foreach (const Resource& resource, slave->used()) {
if (resource.type() == Value::SCALAR) {
usedResources += resource;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/0760b007/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 11d75fb..52a7409 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -3763,10 +3763,10 @@ void Master::reconcile(
// missing from the slave. This could happen if the task was
// dropped by the slave (e.g., slave exited before getting the
// task or the task was launched while slave was in recovery).
- // NOTE: keys() and values() are used since statusUpdate()
- // modifies slave->tasks.
- foreach (const FrameworkID& frameworkId, slave->tasks.keys()) {
- foreach (Task* task, slave->tasks[frameworkId].values()) {
+ // NOTE: copies are used because statusUpdate() modifies
+ // slave->tasks.
+ foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
+ foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id()
<< " of framework " << task->framework_id()
@@ -4010,10 +4010,11 @@ void Master::removeFramework(Framework* framework)
// Remove the framework's offers (if they weren't removed before).
foreach (Offer* offer, utils::copy(framework->offers)) {
- allocator->resourcesRecovered(offer->framework_id(),
- offer->slave_id(),
- Resources(offer->resources()),
- None());
+ allocator->resourcesRecovered(
+ offer->framework_id(),
+ offer->slave_id(),
+ offer->resources(),
+ None());
removeOffer(offer);
}
@@ -4078,9 +4079,9 @@ void Master::removeFramework(Slave* slave, Framework* framework)
// Remove pointers to framework's tasks in slaves, and send status
// updates.
- // NOTE: values() is used because statusUpdate() modifies
+ // NOTE: A copy is used because statusUpdate() modifies
// slave->tasks.
- foreach (Task* task, slave->tasks[framework->id].values()) {
+ foreachvalue (Task* task, utils::copy(slave->tasks[framework->id])) {
// Remove tasks that belong to this framework.
if (task->framework_id() == framework->id) {
// A framework might not actually exist because the master failed
@@ -4257,8 +4258,8 @@ void Master::removeSlave(Slave* slave)
// updates. Rather, build up the updates so that we can send them
// after the slave is removed from the registry.
vector<StatusUpdate> updates;
- foreach (const FrameworkID& frameworkId, slave->tasks.keys()) {
- foreach (Task* task, slave->tasks[frameworkId].values()) {
+ foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
+ foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
const StatusUpdate& update = protobuf::createStatusUpdate(
task->framework_id(),
task->slave_id(),
@@ -4372,6 +4373,21 @@ void Master::removeTask(Task* task)
{
CHECK_NOTNULL(task);
+ Slave* slave = CHECK_NOTNULL(getSlave(task->slave_id()));
+
+ if (!protobuf::isTerminalState(task->state())) {
+ LOG(WARNING) << "Removing task " << task->task_id()
+ << " with resources " << task->resources()
+ << " of framework " << task->framework_id()
+ << " on slave " << *slave
+ << " in non-terminal state " << task->state();
+ } else {
+ LOG(INFO) << "Removing task " << task->task_id()
+ << " with resources " << task->resources()
+ << " of framework " << task->framework_id()
+ << " on slave " << *slave;
+ }
+
// Remove from framework.
Framework* framework = getFramework(task->framework_id());
if (framework != NULL) { // A framework might not be re-connected yet.
@@ -4379,15 +4395,13 @@ void Master::removeTask(Task* task)
}
// Remove from slave.
- Slave* slave = getSlave(task->slave_id());
- CHECK_NOTNULL(slave);
slave->removeTask(task);
// Tell the allocator about the recovered resources.
allocator->resourcesRecovered(
task->framework_id(),
task->slave_id(),
- Resources(task->resources()),
+ task->resources(),
None());
// Update the task state metric.
@@ -4396,12 +4410,7 @@ void Master::removeTask(Task* task)
case TASK_FAILED: ++metrics.tasks_failed; break;
case TASK_KILLED: ++metrics.tasks_killed; break;
case TASK_LOST: ++metrics.tasks_lost; break;
- default:
- LOG(WARNING) << "Removing task " << task->task_id()
- << " of framework " << task->framework_id()
- << " and slave " << task->slave_id()
- << " in non-terminal state " << task->state();
- break;
+ default: break;
}
delete task;
@@ -4934,7 +4943,7 @@ double Master::_resources_used(const std::string& name)
double used = 0.0;
foreachvalue (Slave* slave, slaves.registered) {
- foreach (const Resource& resource, slave->resourcesInUse) {
+ foreach (const Resource& resource, slave->used()) {
if (resource.name() == name && resource.type() == Value::SCALAR) {
used += resource.scalar().value();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/0760b007/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 54e3918..80d7535 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -839,7 +839,6 @@ struct Slave
LOG(INFO) << "Adding task " << task->task_id()
<< " with resources " << task->resources()
<< " on slave " << id << " (" << info.hostname() << ")";
- resourcesInUse += task->resources();
}
void removeTask(Task* task)
@@ -854,10 +853,6 @@ struct Slave
}
killedTasks.remove(task->framework_id(), task->task_id());
- LOG(INFO) << "Removing task " << task->task_id()
- << " with resources " << task->resources()
- << " on slave " << id << " (" << info.hostname() << ")";
- resourcesInUse -= task->resources();
}
void addOffer(Offer* offer)
@@ -867,7 +862,6 @@ struct Slave
VLOG(1) << "Adding offer " << offer->id()
<< " with resources " << offer->resources()
<< " on slave " << id << " (" << info.hostname() << ")";
- resourcesOffered += offer->resources();
}
void removeOffer(Offer* offer)
@@ -877,7 +871,6 @@ struct Slave
VLOG(1) << "Removing offer " << offer->id()
<< " with resources " << offer->resources()
<< " on slave " << id << " (" << info.hostname() << ")";
- resourcesOffered -= offer->resources();
}
bool hasExecutor(const FrameworkID& frameworkId,
@@ -895,9 +888,6 @@ struct Slave
<< " of framework " << frameworkId;
executors[frameworkId][executorInfo.executor_id()] = executorInfo;
-
- // Update the resources in use to reflect running this executor.
- resourcesInUse += executorInfo.resources();
}
void removeExecutor(const FrameworkID& frameworkId,
@@ -906,15 +896,32 @@ struct Slave
CHECK(hasExecutor(frameworkId, executorId))
<< "Unknown executor " << executorId << " of framework " << frameworkId;
- // Update the resources in use to reflect removing this executor.
- resourcesInUse -= executors[frameworkId][executorId].resources();
-
executors[frameworkId].erase(executorId);
if (executors[frameworkId].empty()) {
executors.erase(frameworkId);
}
}
+ Resources used() const
+ {
+ Resources used;
+
+ foreachkey (const FrameworkID& frameworkId, tasks) {
+ foreachvalue (const Task* task, tasks.find(frameworkId)->second) {
+ used += task->resources();
+ }
+ }
+
+ foreachkey (const FrameworkID& frameworkId, executors) {
+ foreachvalue (const ExecutorInfo& executorInfo,
+ executors.find(frameworkId)->second) {
+ used += executorInfo.resources();
+ }
+ }
+
+ return used;
+ }
+
const SlaveID id;
const SlaveInfo info;
@@ -927,9 +934,6 @@ struct Slave
// enabled because we expect it reregister after recovery.
bool disconnected;
- Resources resourcesOffered; // Resources offered.
- Resources resourcesInUse; // Resources used by tasks and executors.
-
// Executors running on this slave.
hashmap<FrameworkID, hashmap<ExecutorID, ExecutorInfo> > executors;