You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/02/15 01:11:31 UTC
git commit: Small cleanups of the Master code.
Repository: mesos
Updated Branches:
refs/heads/master 311743f6b -> 765c938f5
Small cleanups of the Master code.
Review: https://reviews.apache.org/r/15114
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/765c938f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/765c938f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/765c938f
Branch: refs/heads/master
Commit: 765c938f59a680adb3c38c90ad3bd1138c17926c
Parents: 311743f
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Wed Oct 30 22:50:38 2013 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Fri Feb 14 15:48:51 2014 -0800
----------------------------------------------------------------------
src/master/master.cpp | 432 +++++++++++++++++++++++----------------------
src/master/master.hpp | 7 +-
2 files changed, 229 insertions(+), 210 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/765c938f/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a4e1b1f..f24df23 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -242,79 +242,7 @@ Master::Master(
}
-Master::~Master()
-{
- LOG(INFO) << "Shutting down master";
-
- // Remove the frameworks.
- // Note we are not deleting the pointers to the frameworks from the
- // allocator or the roles because it is unnecessary bookkeeping at
- // this point since we are shutting down.
- foreachvalue (Framework* framework, frameworks) {
- // Remove pointers to the framework's tasks in slaves.
- foreachvalue (Task* task, utils::copy(framework->tasks)) {
- Slave* slave = getSlave(task->slave_id());
- // Since we only find out about tasks when the slave re-registers,
- // it must be the case that the slave exists!
- CHECK(slave != NULL)
- << "Unknown slave " << task->slave_id()
- << " in the task " << task->task_id();
-
- removeTask(task);
- }
-
- // Remove the framework's offers (if they weren't removed before).
- foreach (Offer* offer, utils::copy(framework->offers)) {
- removeOffer(offer);
- }
-
- delete framework;
- }
- frameworks.clear();
-
- foreachvalue (Future<Nothing> future, authenticating) {
- // NOTE: This is necessary during tests because a copy of
- // this future is used to setup authentication timeout. If a
- // test doesn't discard this future, authentication timeout might
- // fire in a different test and any associated callbacks
- // (e.g., '_authenticate()') would be called. This is because the
- // master pid doesn't change across the tests.
- // TODO(vinod): This seems to be a bug in libprocess or the
- // testing infrastructure.
- future.discard();
- }
-
- CHECK_EQ(offers.size(), 0UL);
-
- foreachvalue (Slave* slave, slaves) {
- LOG(INFO) << "Removing slave " << slave->id
- << " (" << slave->info.hostname() << ")";
-
- // Remove tasks that are in the slave but not in any framework.
- // This could happen when the framework has yet to reregister
- // after master failover.
- // NOTE: keys() and values() are used because slave->tasks is
- // modified by removeTask()!
- foreach (const FrameworkID& frameworkId, slave->tasks.keys()) {
- foreach (Task* task, slave->tasks[frameworkId].values()) {
- removeTask(task);
- }
- }
-
- // Kill the slave observer.
- terminate(slave->observer);
- wait(slave->observer);
-
- delete slave->observer;
- delete slave;
- }
- slaves.clear();
-
- terminate(whitelistWatcher);
- wait(whitelistWatcher);
-
- delete whitelistWatcher;
-}
+Master::~Master() {}
void Master::initialize()
@@ -585,6 +513,76 @@ void Master::initialize()
void Master::finalize()
{
LOG(INFO) << "Master terminating";
+
+ // Remove the frameworks.
+ // Note we are not deleting the pointers to the frameworks from the
+ // allocator or the roles because it is unnecessary bookkeeping at
+ // this point since we are shutting down.
+ foreachvalue (Framework* framework, frameworks) {
+ // Remove pointers to the framework's tasks in slaves.
+ foreachvalue (Task* task, utils::copy(framework->tasks)) {
+ Slave* slave = getSlave(task->slave_id());
+ // Since we only find out about tasks when the slave re-registers,
+ // it must be the case that the slave exists!
+ CHECK(slave != NULL)
+ << "Unknown slave " << task->slave_id()
+ << " in the task " << task->task_id();
+
+ removeTask(task);
+ }
+
+ // Remove the framework's offers (if they weren't removed before).
+ foreach (Offer* offer, utils::copy(framework->offers)) {
+ removeOffer(offer);
+ }
+
+ delete framework;
+ }
+ frameworks.clear();
+
+ CHECK_EQ(offers.size(), 0UL);
+
+ foreachvalue (Slave* slave, slaves) {
+ // Remove tasks that are in the slave but not in any framework.
+ // This could happen when the framework has yet to re-register
+ // after master failover.
+ // NOTE: keys() and values() are used because slave->tasks is
+ // modified by removeTask()!
+ foreach (const FrameworkID& frameworkId, slave->tasks.keys()) {
+ foreach (Task* task, slave->tasks[frameworkId].values()) {
+ removeTask(task);
+ }
+ }
+
+ // Kill the slave observer.
+ terminate(slave->observer);
+ wait(slave->observer);
+
+ delete slave->observer;
+ delete slave;
+ }
+ slaves.clear();
+
+ foreachvalue (Future<Nothing> future, authenticating) {
+ // NOTE: This is necessary during tests because a copy of
+ // this future is used to setup authentication timeout. If a
+ // test doesn't discard this future, authentication timeout might
+ // fire in a different test and any associated callbacks
+ // (e.g., '_authenticate()') would be called. This is because the
+ // master pid doesn't change across the tests.
+ // TODO(vinod): This seems to be a bug in libprocess or the
+ // testing infrastructure.
+ future.discard();
+ }
+
+ foreachvalue (Role* role, roles) {
+ delete role;
+ }
+ roles.clear();
+
+ terminate(whitelistWatcher);
+ wait(whitelistWatcher);
+ delete whitelistWatcher;
}
@@ -1698,30 +1696,11 @@ void Master::killTask(
}
Task* task = framework->getTask(taskId);
- if (task != NULL) {
- Slave* slave = getSlave(task->slave_id());
- CHECK(slave != NULL) << "Unknown slave " << task->slave_id();
-
- // We add the task to 'killedTasks' here because the slave
- // might be partitioned or disconnected but the master
- // doesn't know it yet.
- slave->killedTasks.put(frameworkId, taskId);
-
- // NOTE: This task will be properly reconciled when the
- // disconnected slave re-registers with the master.
- if (!slave->disconnected) {
- LOG(INFO) << "Telling slave " << slave->id << " ("
- << slave->info.hostname() << ")"
- << " to kill task " << taskId
- << " of framework " << frameworkId;
+ if (task == NULL) {
+ // TODO(bmahler): This is incorrect in some cases, see:
+ // https://issues.apache.org/jira/browse/MESOS-783
- KillTaskMessage message;
- message.mutable_framework_id()->MergeFrom(frameworkId);
- message.mutable_task_id()->MergeFrom(taskId);
- send(slave->pid, message);
- }
- } else {
- // TODO(benh): Once the scheduler has persistance and
+ // TODO(benh): Once the scheduler has persistence and
// high-availability of it's tasks, it will be the one that
// determines that this invocation of 'killTask' is silly, and
// can just return "locally" (i.e., after hitting only the other
@@ -1729,7 +1708,8 @@ void Master::killTask(
LOG(WARNING) << "Cannot kill task " << taskId
<< " of framework " << frameworkId
- << " because the task cannot be found";
+ << " because it cannot be found, sending TASK_LOST";
+
StatusUpdateMessage message;
StatusUpdate* update = message.mutable_update();
update->mutable_framework_id()->MergeFrom(frameworkId);
@@ -1740,6 +1720,29 @@ void Master::killTask(
update->set_timestamp(Clock::now().secs());
update->set_uuid(UUID::random().toBytes());
send(framework->pid, message);
+ return;
+ }
+
+ Slave* slave = getSlave(task->slave_id());
+ CHECK(slave != NULL) << "Unknown slave " << task->slave_id();
+
+ // We add the task to 'killedTasks' here because the slave
+ // might be partitioned or disconnected but the master
+ // doesn't know it yet.
+ slave->killedTasks.put(frameworkId, taskId);
+
+ // NOTE: This task will be properly reconciled when the
+ // disconnected slave re-registers with the master.
+ if (!slave->disconnected) {
+ LOG(INFO) << "Telling slave " << slave->id << " ("
+ << slave->info.hostname() << ")"
+ << " to kill task " << taskId
+ << " of framework " << frameworkId;
+
+ KillTaskMessage message;
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_task_id()->MergeFrom(taskId);
+ send(slave->pid, message);
}
}
@@ -1773,33 +1776,35 @@ void Master::schedulerMessage(
}
Slave* slave = getSlave(slaveId);
- if (slave != NULL) {
- if (!slave->disconnected) {
- LOG(INFO) << "Sending framework message for framework "
- << frameworkId << " to slave " << slaveId
- << " (" << slave->info.hostname() << ")";
-
- FrameworkToExecutorMessage message;
- message.mutable_slave_id()->MergeFrom(slaveId);
- message.mutable_framework_id()->MergeFrom(frameworkId);
- message.mutable_executor_id()->MergeFrom(executorId);
- message.set_data(data);
- send(slave->pid, message);
-
- stats.validFrameworkMessages++;
- } else {
- LOG(WARNING) << "Cannot send framework message for framework "
- << frameworkId << " to slave " << slaveId
- << " (" << slave->info.hostname() << ")"
- << " because slave is disconnected";
- stats.invalidFrameworkMessages++;
- }
- } else {
+ if (slave == NULL) {
LOG(WARNING) << "Cannot send framework message for framework "
<< frameworkId << " to slave " << slaveId
<< " because slave does not exist";
stats.invalidFrameworkMessages++;
+ return;
+ }
+
+ if (slave->disconnected) {
+ LOG(WARNING) << "Cannot send framework message for framework "
+ << frameworkId << " to slave " << slaveId
+ << " (" << slave->info.hostname() << ")"
+ << " because slave is disconnected";
+ stats.invalidFrameworkMessages++;
+ return;
}
+
+ LOG(INFO) << "Sending framework message for framework "
+ << frameworkId << " to slave " << slaveId
+ << " (" << slave->info.hostname() << ")";
+
+ FrameworkToExecutorMessage message;
+ message.mutable_slave_id()->MergeFrom(slaveId);
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_executor_id()->MergeFrom(executorId);
+ message.set_data(data);
+ send(slave->pid, message);
+
+ stats.validFrameworkMessages++;
}
@@ -1858,93 +1863,97 @@ void Master::reregisterSlave(
}
if (slaveId == "") {
- LOG(ERROR) << "Slave " << from << " re-registered without an id!";
+ LOG(ERROR) << "Shutting down slave " << from << " that re-registered "
+ << "without an id!";
reply(ShutdownMessage());
- } else if (deactivatedSlaves.contains(from)) {
+ return;
+ }
+
+ if (deactivatedSlaves.contains(from)) {
// We disallow deactivated slaves from re-registering. This is
// to ensure that when a master deactivates a slave that was
// partitioned, we don't allow the slave to re-register, as we've
// already informed frameworks that the tasks were lost.
- LOG(ERROR) << "Slave " << slaveId << " at " << from
- << " attempted to re-register after deactivation";
+ LOG(ERROR) << "Shutting down slave " << slaveId << " at " << from
+ << " that attempted to re-register after deactivation";
reply(ShutdownMessage());
- } else {
- Slave* slave = getSlave(slaveId);
-
- if (slave != NULL) {
- slave->reregisteredTime = Clock::now();
-
- // NOTE: This handles the case where a slave tries to
- // re-register with an existing master (e.g. because of a
- // spurious Zookeeper session expiration or after the slave
- // recovers after a restart).
- // For now, we assume this slave is not nefarious (eventually
- // this will be handled by orthogonal security measures like key
- // based authentication).
- LOG(WARNING) << "Slave at " << from << " (" << slave->info.hostname()
- << ") is being allowed to re-register with an already"
- << " in use id (" << slaveId << ")";
-
- // TODO(bmahler): There's an implicit assumption here that when
- // the master already knows about this slave, the slave cannot
- // have tasks unknown to the master. This _should_ be the case
- // since the causal relationship is:
- // slave removes task -> master removes task
- // We should enforce this via a CHECK (dangerous), or by shutting
- // down slaves that are found to violate this assumption.
-
- SlaveReregisteredMessage message;
- message.mutable_slave_id()->MergeFrom(slave->id);
- reply(message);
-
- // Update the slave pid and relink to it.
- // NOTE: Re-linking the slave here always rather than only when
- // the slave is disconnected can lead to multiple exited events
- // in succession for a disconnected slave. As a result, we
- // ignore duplicate exited events for disconnected checkpointing
- // slaves.
- // See: https://issues.apache.org/jira/browse/MESOS-675
- slave->pid = from;
- link(slave->pid);
-
- // Reconcile tasks between master and the slave.
- // NOTE: This needs to be done after the registration message is
- // sent to the slave and the new pid is linked.
- reconcile(slave, executorInfos, tasks);
-
- // If this is a disconnected slave, add it back to the allocator.
- // This is done after reconciliation to ensure the allocator's
- // offers include the recovered resources initially on this
- // slave.
- if (slave->disconnected) {
- slave->disconnected = false; // Reset the flag.
- allocator->slaveReconnected(slaveId);
- }
- } else {
- // NOTE: This handles the case when the slave tries to
- // re-register with a failed over master.
- slave = new Slave(slaveInfo, slaveId, from, Clock::now());
- slave->reregisteredTime = Clock::now();
+ return;
+ }
- LOG(INFO) << "Attempting to re-register slave " << slave->id << " at "
- << slave->pid << " (" << slave->info.hostname() << ")";
+ Slave* slave = getSlave(slaveId);
+ if (slave != NULL) {
+ slave->reregisteredTime = Clock::now();
+
+ // NOTE: This handles the case where a slave tries to
+ // re-register with an existing master (e.g. because of a
+ // spurious Zookeeper session expiration or after the slave
+ // recovers after a restart).
+ // For now, we assume this slave is not nefarious (eventually
+ // this will be handled by orthogonal security measures like key
+ // based authentication).
+ LOG(WARNING) << "Slave at " << from << " (" << slave->info.hostname()
+ << ") is being allowed to re-register with an already"
+ << " in use id (" << slaveId << ")";
+
+ // TODO(bmahler): There's an implicit assumption here that when
+ // the master already knows about this slave, the slave cannot
+ // have tasks unknown to the master. This _should_ be the case
+ // since the causal relationship is:
+ // slave removes task -> master removes task
+ // We should enforce this via a CHECK (dangerous), or by shutting
+ // down slaves that are found to violate this assumption.
- readdSlave(slave, executorInfos, tasks);
+ SlaveReregisteredMessage message;
+ message.mutable_slave_id()->MergeFrom(slave->id);
+ reply(message);
+
+ // Update the slave pid and relink to it.
+ // NOTE: Re-linking the slave here always rather than only when
+ // the slave is disconnected can lead to multiple exited events
+ // in succession for a disconnected slave. As a result, we
+ // ignore duplicate exited events for disconnected checkpointing
+ // slaves.
+ // See: https://issues.apache.org/jira/browse/MESOS-675
+ slave->pid = from;
+ link(slave->pid);
+
+ // Reconcile tasks between master and the slave.
+ // NOTE: This needs to be done after the registration message is
+ // sent to the slave and the new pid is linked.
+ reconcile(slave, executorInfos, tasks);
+
+ // If this is a disconnected slave, add it back to the allocator.
+ // This is done after reconciliation to ensure the allocator's
+ // offers include the recovered resources initially on this
+ // slave.
+ if (slave->disconnected) {
+ slave->disconnected = false; // Reset the flag.
+ allocator->slaveReconnected(slaveId);
}
+ } else {
+ // NOTE: This handles the case when the slave tries to
+ // re-register with a failed over master.
+ slave = new Slave(slaveInfo, slaveId, from, Clock::now());
+ slave->reregisteredTime = Clock::now();
- // Send the latest framework pids to the slave.
- CHECK_NOTNULL(slave);
- hashset<UPID> pids;
- foreach (const Task& task, tasks) {
- Framework* framework = getFramework(task.framework_id());
- if (framework != NULL && !pids.contains(framework->pid)) {
- UpdateFrameworkMessage message;
- message.mutable_framework_id()->MergeFrom(framework->id);
- message.set_pid(framework->pid);
- send(slave->pid, message);
+ LOG(INFO) << "Attempting to re-register slave " << slave->id << " at "
+ << slave->pid << " (" << slave->info.hostname() << ")";
- pids.insert(framework->pid);
- }
+ readdSlave(slave, executorInfos, tasks);
+ }
+
+ // Send the latest framework pids to the slave.
+ CHECK_NOTNULL(slave);
+ hashset<UPID> pids;
+ foreach (const Task& task, tasks) {
+ Framework* framework = getFramework(task.framework_id());
+ if (framework != NULL && !pids.contains(framework->pid)) {
+ UpdateFrameworkMessage message;
+ message.mutable_framework_id()->MergeFrom(framework->id);
+ message.set_pid(framework->pid);
+ send(slave->pid, message);
+
+ pids.insert(framework->pid);
}
}
}
@@ -1997,22 +2006,15 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
<< " which is deactivated slave " << update.slave_id()
<< "(" << slave->info.hostname() << ")";
- Framework* framework = getFramework(update.framework_id());
- if (framework == NULL) {
- LOG(WARNING) << "Ignoring status update " << update
- << " from " << pid << " ("
- << slave->info.hostname() << "): error, couldn't lookup "
- << "framework " << update.framework_id();
+ // Forward the update to the framework.
+ Try<Nothing> _forward = forward(update, pid);
+ if (_forward.isError()) {
+ LOG(WARNING) << "Ignoring status update " << update << " from " << pid
+ << " (" << slave->info.hostname() << "): " << _forward.error();
stats.invalidStatusUpdates++;
return;
}
- // Pass on the (transformed) status update to the framework.
- StatusUpdateMessage message;
- message.mutable_update()->MergeFrom(update);
- message.set_pid(pid);
- send(framework->pid, message);
-
// Lookup the task and see if we need to update anything locally.
Task* task = slave->getTask(update.framework_id(), status.task_id());
if (task == NULL) {
@@ -2043,6 +2045,22 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
}
+Try<Nothing> Master::forward(const StatusUpdate& update, const UPID& pid)
+{
+ Framework* framework = getFramework(update.framework_id());
+ if (framework == NULL) {
+ return Error("Unknown framework " + stringify(update.framework_id()));
+ }
+
+ // Pass on the (transformed) status update to the framework.
+ StatusUpdateMessage message;
+ message.mutable_update()->MergeFrom(update);
+ message.set_pid(pid);
+ send(framework->pid, message);
+ return Nothing();
+}
+
+
void Master::exitedExecutor(
const UPID& from,
const SlaveID& slaveId,
http://git-wip-us.apache.org/repos/asf/mesos/blob/765c938f/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 737bd8b..00d630a 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -254,6 +254,9 @@ protected:
// Remove a task.
void removeTask(Task* task);
+ // Forwards the update to the framework.
+ Try<Nothing> forward(const StatusUpdate& update, const UPID& pid);
+
// Remove an offer and optionally rescind the offer as well.
void removeOffer(Offer* offer, bool rescind = false);
@@ -387,7 +390,6 @@ struct Slave
info(_info),
pid(_pid),
registeredTime(time),
- lastHeartbeat(time),
disconnected(false),
observer(NULL) {}
@@ -493,7 +495,6 @@ struct Slave
Time registeredTime;
Option<Time> reregisteredTime;
- Time lastHeartbeat;
// We mark a slave 'disconnected' when it has checkpointing
// enabled because we expect it reregister after recovery.
@@ -623,8 +624,8 @@ struct Framework
}
}
-
const FrameworkID id; // TODO(benh): Store this in 'info'.
+
const FrameworkInfo info;
UPID pid;