You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/10/08 20:45:54 UTC
[7/8] git commit: Removed the need for Master::readdSlave.
Removed the need for Master::readdSlave.
Review: https://reviews.apache.org/r/26204
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a4a0d158
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a4a0d158
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a4a0d158
Branch: refs/heads/master
Commit: a4a0d1580815360083a46b43d458c5babd58c632
Parents: ee4f879
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Mon Sep 29 14:07:40 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Wed Oct 8 11:45:12 2014 -0700
----------------------------------------------------------------------
src/master/master.cpp | 105 +++++++++++++++++++--------------------------
src/master/master.hpp | 24 ++++++++---
2 files changed, 60 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a4a0d158/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index e445c86..26cd29a 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2993,11 +2993,16 @@ void Master::_registerSlave(
version.empty() ? Option<string>::none() : version,
Clock::now());
- LOG(INFO) << "Registered slave " << *slave
- << " with " << slave->info.resources();
++metrics.slave_registrations;
addSlave(slave);
+
+ SlaveRegisteredMessage message;
+ message.mutable_slave_id()->MergeFrom(slave->id);
+ send(slave->pid, message);
+
+ LOG(INFO) << "Registered slave " << *slave
+ << " with " << slave->info.resources();
}
}
@@ -3188,15 +3193,22 @@ void Master::_reregisterSlave(
slaveInfo,
pid,
version.empty() ? Option<string>::none() : version,
- Clock::now());
+ Clock::now(),
+ executorInfos,
+ tasks);
slave->reregisteredTime = Clock::now();
- LOG(INFO) << "Re-registered slave " << *slave
- << " with " << slave->info.resources();
++metrics.slave_reregistrations;
- readdSlave(slave, executorInfos, tasks, completedFrameworks);
+ addSlave(slave, completedFrameworks);
+
+ SlaveReregisteredMessage message;
+ message.mutable_slave_id()->MergeFrom(slave->id);
+ send(slave->pid, message);
+
+ LOG(INFO) << "Re-registered slave " << *slave
+ << " with " << slave->info.resources();
__reregisterSlave(slave, tasks);
}
@@ -4214,7 +4226,9 @@ void Master::removeFramework(Slave* slave, Framework* framework)
}
-void Master::addSlave(Slave* slave, bool reregister)
+void Master::addSlave(
+ Slave* slave,
+ const vector<Archive::Framework>& completedFrameworks)
{
CHECK_NOTNULL(slave);
@@ -4223,70 +4237,37 @@ void Master::addSlave(Slave* slave, bool reregister)
link(slave->pid);
- if (!reregister) {
- SlaveRegisteredMessage message;
- message.mutable_slave_id()->MergeFrom(slave->id);
- send(slave->pid, message);
- } else {
- SlaveReregisteredMessage message;
- message.mutable_slave_id()->MergeFrom(slave->id);
- send(slave->pid, message);
- }
-
// Set up an observer for the slave.
slave->observer = new SlaveObserver(
slave->pid, slave->info, slave->id, self());
spawn(slave->observer);
- if (!reregister) {
- allocator->slaveAdded(slave->id, slave->info, slave->usedResources);
- }
-}
-
-
-void Master::readdSlave(
- Slave* slave,
- const vector<ExecutorInfo>& executorInfos,
- const vector<Task>& tasks,
- const vector<Archive::Framework>& completedFrameworks)
-{
- CHECK_NOTNULL(slave);
-
- addSlave(slave, true);
-
- foreach (const ExecutorInfo& executorInfo, executorInfos) {
- // TODO(bmahler): ExecutorInfo.framework_id is set by the Scheduler
- // Driver in 0.14.0. Therefore, in 0.15.0, the slave no longer needs
- // to set it, and we could remove this CHECK if desired.
- CHECK(executorInfo.has_framework_id())
- << "Executor " << executorInfo.executor_id()
- << " doesn't have frameworkId set";
-
- slave->addExecutor(executorInfo.framework_id(), executorInfo);
-
- Framework* framework = getFramework(executorInfo.framework_id());
- if (framework != NULL) { // The framework might not be re-registered yet.
- framework->addExecutor(slave->id, executorInfo);
+ // Add the slave's executors to the frameworks.
+ foreachkey (const FrameworkID& frameworkId, slave->executors) {
+ foreachvalue (const ExecutorInfo& executorInfo,
+ slave->executors[frameworkId]) {
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL) { // The framework might not be re-registered yet.
+ framework->addExecutor(slave->id, executorInfo);
+ }
}
}
- foreach (const Task& task, tasks) {
- Task* t = new Task(task);
-
- // Add the task to the slave.
- slave->addTask(t);
-
- Framework* framework = getFramework(task.framework_id());
- if (framework != NULL) { // The framework might not be re-registered yet.
- framework->addTask(t);
- } else {
- // TODO(benh): We should really put a timeout on how long we
- // keep tasks running on a slave that never have frameworks
- // reregister and claim them.
- LOG(WARNING) << "Possibly orphaned task " << task.task_id()
- << " of framework " << task.framework_id()
- << " running on slave " << *slave;
+ // Add the slave's tasks to the frameworks.
+ foreachkey (const FrameworkID& frameworkId, slave->tasks) {
+ foreachvalue (Task* task, slave->tasks[frameworkId]) {
+ Framework* framework = getFramework(task->framework_id());
+ if (framework != NULL) { // The framework might not be re-registered yet.
+ framework->addTask(task);
+ } else {
+ // TODO(benh): We should really put a timeout on how long we
+ // keep tasks running on a slave that never have frameworks
+ // reregister and claim them.
+ LOG(WARNING) << "Possibly orphaned task " << task->task_id()
+ << " of framework " << task->framework_id()
+ << " running on slave " << *slave;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/a4a0d158/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index e97d213..37ce31a 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -335,13 +335,10 @@ protected:
void deactivate(Slave* slave);
// Add a slave.
- void addSlave(Slave* slave, bool reregister = false);
-
- void readdSlave(
+ void addSlave(
Slave* slave,
- const std::vector<ExecutorInfo>& executorInfos,
- const std::vector<Task>& tasks,
- const std::vector<Archive::Framework>& completedFrameworks);
+ const std::vector<Archive::Framework>& completedFrameworks =
+ std::vector<Archive::Framework>());
// Remove the slave from the registrar and from the master's state.
void removeSlave(Slave* slave);
@@ -820,7 +817,11 @@ struct Slave
Slave(const SlaveInfo& _info,
const process::UPID& _pid,
const Option<std::string> _version,
- const process::Time& _registeredTime)
+ const process::Time& _registeredTime,
+ const std::vector<ExecutorInfo> executorInfos =
+ std::vector<ExecutorInfo>(),
+ const std::vector<Task> tasks =
+ std::vector<Task>())
: id(_info.id()),
info(_info),
pid(_pid),
@@ -831,6 +832,15 @@ struct Slave
observer(NULL)
{
CHECK(_info.has_id());
+
+ foreach (const ExecutorInfo& executorInfo, executorInfos) {
+ CHECK(executorInfo.has_framework_id());
+ addExecutor(executorInfo.framework_id(), executorInfo);
+ }
+
+ foreach (const Task& task, tasks) {
+ addTask(new Task(task));
+ }
}
~Slave() {}