You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:27:16 UTC
svn commit: r1132335 [2/4] - in /incubator/mesos/trunk: include/
include/mesos/ src/ src/common/ src/detector/ src/examples/ src/exec/
src/java/jni/ src/launcher/ src/local/ src/master/ src/messaging/
src/python/native/ src/sched/ src/slave/ src/tests/
Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1132335&r1=1132334&r2=1132335&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Sun Jun 5 09:27:14 2011
@@ -1,19 +1,44 @@
#include <iomanip>
+#include <fstream>
+#include <sstream>
#include <glog/logging.h>
+#include <google/protobuf/descriptor.h>
+
+#include <process/run.hpp>
+
+#include "config/config.hpp"
+
+#include "common/build.hpp"
#include "common/date_utils.hpp"
#include "allocator.hpp"
#include "allocator_factory.hpp"
#include "master.hpp"
+#include "slaves_manager.hpp"
#include "webui.hpp"
+using namespace mesos;
+using namespace mesos::internal;
+using namespace mesos::internal::master;
+
+using boost::bad_lexical_cast;
+using boost::lexical_cast;
+using boost::unordered_map;
+using boost::unordered_set;
+
+using process::HttpOKResponse;
+using process::HttpResponse;
+using process::HttpRequest;
+using process::PID;
+using process::Process;
+using process::Promise;
+using process::UPID;
+
using std::endl;
-using std::make_pair;
using std::map;
using std::max;
-using std::min;
using std::ostringstream;
using std::pair;
using std::set;
@@ -22,77 +47,69 @@ using std::setw;
using std::string;
using std::vector;
-using boost::lexical_cast;
-using boost::unordered_map;
-using boost::unordered_set;
-
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::master;
-
namespace {
-// A process that periodically pings the master to check filter expiries, etc
-class AllocatorTimer : public MesosProcess
+// A process that periodically pings the master to check filter
+// expiries, etc.
+class AllocatorTimer : public Process<AllocatorTimer>
{
-private:
- const PID master;
+public:
+ AllocatorTimer(const PID<Master>& _master) : master(_master) {}
protected:
- void operator () ()
+ virtual void operator () ()
{
link(master);
- do {
- switch (receive(1)) {
- case PROCESS_TIMEOUT:
- send(master, pack<M2M_TIMER_TICK>());
- break;
- case PROCESS_EXIT:
+ while (true) {
+ receive(1);
+ if (name() == process::TIMEOUT) {
+ process::dispatch(master, &Master::timerTick);
+ } else if (name() == process::EXITED) {
return;
}
- } while (true);
+ }
}
-public:
- AllocatorTimer(const PID &_master) : master(_master) {}
+private:
+ const PID<Master> master;
};
// A process that periodically prints frameworks' shares to a file
-class SharesPrinter : public MesosProcess
+class SharesPrinter : public Process<SharesPrinter>
{
-protected:
- PID master;
+public:
+ SharesPrinter(const PID<Master>& _master) : master(_master) {}
+ ~SharesPrinter() {}
- void operator () ()
+protected:
+ virtual void operator () ()
{
int tick = 0;
std::ofstream file ("/mnt/shares");
- if (!file.is_open())
+ if (!file.is_open()) {
LOG(FATAL) << "Could not open /mnt/shares";
+ }
while (true) {
pause(1);
- send(master, pack<M2M_GET_STATE>());
- receive();
- CHECK(msgid() == M2M_GET_STATE_REPLY);
- state::MasterState *state = unpack<M2M_GET_STATE_REPLY, 0>(body());
+ state::MasterState* state = call(master, &Master::getState);
uint32_t total_cpus = 0;
uint32_t total_mem = 0;
- foreach (state::Slave *s, state->slaves) {
+ foreach (state::Slave* s, state->slaves) {
total_cpus += s->cpus;
total_mem += s->mem;
}
if (state->frameworks.empty()) {
- file << "--------------------------------" << std::endl;
+ file << "--------------------------------" << endl;
} else {
- foreach (state::Framework *f, state->frameworks) {
+ foreach (state::Framework* f, state->frameworks) {
double cpu_share = f->cpus / (double) total_cpus;
double mem_share = f->mem / (double) total_mem;
double max_share = max(cpu_share, mem_share);
@@ -107,25 +124,88 @@ protected:
file.close();
}
+private:
+ const PID<Master> master;
+};
+
+} // namespace {
+
+
+namespace mesos { namespace internal { namespace master {
+
+class SlaveObserver : public Process<SlaveObserver>
+{
public:
- SharesPrinter(const PID &_master) : master(_master) {}
- ~SharesPrinter() {}
+ SlaveObserver(const UPID& _slave,
+ const SlaveInfo& _slaveInfo,
+ const SlaveID& _slaveId,
+ const PID<SlavesManager>& _slavesManager)
+ : slave(_slave), slaveInfo(_slaveInfo), slaveId(_slaveId),
+ slavesManager(_slavesManager), timeouts(0), pinged(false) {}
+
+ virtual ~SlaveObserver() {}
+
+protected:
+ virtual void operator () ()
+ {
+ // Send a ping some interval after we heard the last pong. Or if
+ // we don't hear a pong, increment the number of timeouts from the
+ // slave and try and send another ping. If we eventually timeout too
+ // many missed pongs in a row, consider the slave dead.
+ send(slave, PING);
+ pinged = true;
+
+ do {
+ receive(SLAVE_PONG_TIMEOUT);
+ if (name() == PONG) {
+ timeouts = 0;
+ pinged = false;
+ } else if (name() == process::TIMEOUT) {
+ if (pinged) {
+ timeouts++;
+ pinged = false;
+ }
+
+ send(slave, PING);
+ pinged = true;
+ } else if (name() == process::TERMINATE) {
+ return;
+ }
+ } while (timeouts < MAX_SLAVE_TIMEOUTS);
+
+ // Tell the slave manager to deactivate the slave, this will take
+ // care of updating the master too.
+ while (!process::call(slavesManager, &SlavesManager::deactivate,
+ slaveInfo.hostname(), slave.port)) {
+ LOG(WARNING) << "Slave \"failed\" but can't be deactivated, retrying";
+ pause(5);
+ }
+ }
+
+private:
+ const UPID slave;
+ const SlaveInfo slaveInfo;
+ const SlaveID slaveId;
+ const PID<SlavesManager> slavesManager;
+ int timeouts;
+ bool pinged;
};
-}
+}}} // namespace mesos { namespace master { namespace internal {
Master::Master()
- : nextFrameworkId(0), nextSlaveId(0), nextSlotOfferId(0)
+ : MesosProcess<Master>("master")
{
- allocatorType = "simple";
+ initialize();
}
-Master::Master(const Params& conf_)
- : conf(conf_), nextFrameworkId(0), nextSlaveId(0), nextSlotOfferId(0)
+Master::Master(const Configuration& conf)
+ : MesosProcess<Master>("master"),
+ conf(conf)
{
- allocatorType = conf.get("allocator", "simple");
+ initialize();
}
@@ -133,63 +213,113 @@ Master::~Master()
{
LOG(INFO) << "Shutting down master";
- delete allocator;
-
- foreachpair (_, Framework *framework, frameworks) {
- foreachpair(_, Task *task, framework->tasks)
- delete task;
- delete framework;
+ foreachpaircopy (_, Framework* framework, frameworks) {
+ removeFramework(framework);
}
- foreachpair (_, Slave *slave, slaves) {
- delete slave;
+ foreachpaircopy (_, Slave* slave, slaves) {
+ removeSlave(slave);
}
- foreachpair (_, SlotOffer *offer, slotOffers) {
- delete offer;
- }
+ delete allocator;
+
+ CHECK(slotOffers.size() == 0);
+
+ process::post(slavesManager->self(), process::TERMINATE);
+ process::wait(slavesManager->self());
+
+ delete slavesManager;
}
-void Master::registerOptions(Configurator* conf)
+void Master::registerOptions(Configurator* configurator)
{
- conf->addOption<string>("allocator", 'a', "Allocation module name", "simple");
- conf->addOption<bool>("root_submissions",
- "Can root submit frameworks?",
- true);
+ SlavesManager::registerOptions(configurator);
+ configurator->addOption<string>("allocator", 'a',
+ "Allocation module name",
+ "simple");
+ configurator->addOption<bool>("root_submissions",
+ "Can root submit frameworks?",
+ true);
}
-state::MasterState * Master::getState()
+Promise<state::MasterState*> Master::getState()
{
- std::ostringstream oss;
- oss << self();
- state::MasterState *state =
- new state::MasterState(BUILD_DATE, BUILD_USER, oss.str());
+ state::MasterState* state =
+ new state::MasterState(build::DATE, build::USER, self());
+
+ foreachpair (_, Slave* s, slaves) {
+ Resources resources(s->info.resources());
+ Resource::Scalar cpus;
+ Resource::Scalar mem;
+ cpus.set_value(-1);
+ mem.set_value(-1);
+ cpus = resources.getScalar("cpus", cpus);
+ mem = resources.getScalar("mem", mem);
+
+ state::Slave* slave =
+ new state::Slave(s->slaveId.value(), s->info.hostname(),
+ s->info.public_hostname(), cpus.value(),
+ mem.value(), s->connectTime);
- foreachpair (_, Slave *s, slaves) {
- state::Slave *slave = new state::Slave(s->id, s->hostname, s->publicDns,
- s->resources.cpus, s->resources.mem, s->connectTime);
state->slaves.push_back(slave);
}
- foreachpair (_, Framework *f, frameworks) {
- state::Framework *framework = new state::Framework(f->id, f->user,
- f->name, f->executorInfo.uri, f->resources.cpus, f->resources.mem,
- f->connectTime);
+ foreachpair (_, Framework* f, frameworks) {
+ Resources resources(f->resources);
+ Resource::Scalar cpus;
+ Resource::Scalar mem;
+ cpus.set_value(-1);
+ mem.set_value(-1);
+ cpus = resources.getScalar("cpus", cpus);
+ mem = resources.getScalar("mem", mem);
+
+ state::Framework* framework =
+ new state::Framework(f->frameworkId.value(), f->info.user(),
+ f->info.name(), f->info.executor().uri(),
+ cpus.value(), mem.value(), f->connectTime);
+
state->frameworks.push_back(framework);
- foreachpair (_, Task *t, f->tasks) {
- state::Task *task = new state::Task(t->id, t->name, t->frameworkId,
- t->slaveId, t->state, t->resources.cpus, t->resources.mem);
+
+ foreachpair (_, Task* t, f->tasks) {
+ Resources resources(t->resources());
+ Resource::Scalar cpus;
+ Resource::Scalar mem;
+ cpus.set_value(-1);
+ mem.set_value(-1);
+ cpus = resources.getScalar("cpus", cpus);
+ mem = resources.getScalar("mem", mem);
+
+ state::Task* task =
+ new state::Task(t->task_id().value(), t->name(),
+ t->framework_id().value(), t->slave_id().value(),
+ TaskState_descriptor()->FindValueByNumber(t->state())->name(),
+ cpus.value(), mem.value());
+
framework->tasks.push_back(task);
}
- foreach (SlotOffer *o, f->slotOffers) {
- state::SlotOffer *offer = new state::SlotOffer(o->id, o->frameworkId);
- foreach (SlaveResources &r, o->resources) {
- state::SlaveResources *resources = new state::SlaveResources(
- r.slave->id, r.resources.cpus, r.resources.mem);
- offer->resources.push_back(resources);
+
+ foreach (SlotOffer* o, f->slotOffers) {
+ state::SlotOffer* offer =
+ new state::SlotOffer(o->offerId.value(), o->frameworkId.value());
+
+ foreach (const SlaveResources& r, o->resources) {
+ Resources resources(r.resources);
+ Resource::Scalar cpus;
+ Resource::Scalar mem;
+ cpus.set_value(-1);
+ mem.set_value(-1);
+ cpus = resources.getScalar("cpus", cpus);
+ mem = resources.getScalar("mem", mem);
+
+ state::SlaveResources* sr =
+ new state::SlaveResources(r.slave->slaveId.value(),
+ cpus.value(), mem.value());
+
+ offer->resources.push_back(sr);
}
+
framework->offers.push_back(offer);
}
}
@@ -199,57 +329,58 @@ state::MasterState * Master::getState()
// Return connected frameworks that are not in the process of being removed
-vector<Framework *> Master::getActiveFrameworks()
+vector<Framework*> Master::getActiveFrameworks()
{
- vector <Framework *> result;
- foreachpair(_, Framework *framework, frameworks)
- if (framework->active)
+ vector <Framework*> result;
+ foreachpair(_, Framework* framework, frameworks) {
+ if (framework->active) {
result.push_back(framework);
+ }
+ }
return result;
}
// Return connected slaves that are not in the process of being removed
-vector<Slave *> Master::getActiveSlaves()
+vector<Slave*> Master::getActiveSlaves()
{
- vector <Slave *> result;
- foreachpair(_, Slave *slave, slaves)
- if (slave->active)
+ vector <Slave*> result;
+ foreachpair(_, Slave* slave, slaves) {
+ if (slave->active) {
result.push_back(slave);
+ }
+ }
return result;
}
-Framework * Master::lookupFramework(FrameworkID fid)
+Framework* Master::lookupFramework(const FrameworkID& frameworkId)
{
- unordered_map<FrameworkID, Framework *>::iterator it =
- frameworks.find(fid);
- if (it != frameworks.end())
- return it->second;
- else
+ if (frameworks.count(frameworkId) > 0) {
+ return frameworks[frameworkId];
+ } else {
return NULL;
+ }
}
-Slave * Master::lookupSlave(SlaveID sid)
+Slave* Master::lookupSlave(const SlaveID& slaveId)
{
- unordered_map<SlaveID, Slave *>::iterator it =
- slaves.find(sid);
- if (it != slaves.end())
- return it->second;
- else
+ if (slaves.count(slaveId) > 0) {
+ return slaves[slaveId];
+ } else {
return NULL;
+ }
}
-SlotOffer * Master::lookupSlotOffer(OfferID oid)
+SlotOffer* Master::lookupSlotOffer(const OfferID& offerId)
{
- unordered_map<OfferID, SlotOffer *>::iterator it =
- slotOffers.find(oid);
- if (it != slotOffers.end())
- return it->second;
- else
+ if (slotOffers.count(offerId) > 0) {
+ return slotOffers[offerId];
+ } else {
return NULL;
+ }
}
@@ -257,662 +388,1091 @@ void Master::operator () ()
{
LOG(INFO) << "Master started at mesos://" << self();
- // Don't do anything until we get a master ID.
- while (receive() != GOT_MASTER_ID) {
+ // Don't do anything until we get a master token.
+ while (receive() != GOT_MASTER_TOKEN) {
LOG(INFO) << "Oops! We're dropping a message since "
<< "we haven't received an identifier yet!";
}
- string faultToleranceId;
- tie(faultToleranceId) = unpack<GOT_MASTER_ID>(body());
- masterId = DateUtils::currentDate() + "-" + faultToleranceId;
+
+ MSG<GOT_MASTER_TOKEN> msg;
+ msg.ParseFromString(body());
+
+ // The master ID is comprised of the current date and some ephemeral
+ // token (e.g., determined by ZooKeeper).
+
+ masterId = DateUtils::currentDate() + "-" + msg.token();
LOG(INFO) << "Master ID: " << masterId;
+ // Setup slave manager.
+ slavesManager = new SlavesManager(conf, self());
+ process::spawn(slavesManager);
+
// Create the allocator (we do this after the constructor because it
// leaks 'this').
allocator = createAllocator();
- if (!allocator)
+ if (!allocator) {
LOG(FATAL) << "Unrecognized allocator type: " << allocatorType;
+ }
link(spawn(new AllocatorTimer(self())));
//link(spawn(new SharesPrinter(self())));
while (true) {
- switch (receive()) {
-
- case NEW_MASTER_DETECTED: {
- // TODO(benh): We might have been the master, but then got
- // partitioned, and now we are finding out once we reconnect
- // that we are no longer the master, so we should just die.
- LOG(INFO) << "New master detected ... maybe it's us!";
+ serve();
+ if (name() == process::TERMINATE) {
+ LOG(INFO) << "Asked to terminate by " << from();
+ foreachpair (_, Slave* slave, slaves) {
+ send(slave->pid, process::TERMINATE);
+ }
break;
+ } else {
+ LOG(WARNING) << "Dropping unknown message '" << name() << "'"
+ << " from: " << from();
}
+ }
+}
- case NO_MASTER_DETECTED: {
- LOG(INFO) << "No master detected ... maybe we're next!";
- break;
- }
- case F2M_REGISTER_FRAMEWORK: {
- FrameworkID fid = newFrameworkId();
- Framework *framework = new Framework(from(), fid, elapsed());
-
- tie(framework->name, framework->user, framework->executorInfo) =
- unpack<F2M_REGISTER_FRAMEWORK>(body());
-
- LOG(INFO) << "Registering " << framework << " at " << framework->pid;
-
- if (framework->executorInfo.uri == "") {
- LOG(INFO) << framework << " registering without an executor URI";
- send(framework->pid, pack<M2F_ERROR>(1, "No executor URI given"));
- delete framework;
- break;
- }
+void Master::initialize()
+{
+ active = false;
- bool rootSubmissions = conf.get<bool>("root_submissions", true);
- if (framework->user == "root" && rootSubmissions == false) {
- LOG(INFO) << framework << " registering as root, but "
- << "root submissions are disabled on this cluster";
- send(framework->pid, pack<M2F_ERROR>(
- 1, "Root is not allowed to submit jobs on this cluster"));
- delete framework;
- break;
- }
+ nextFrameworkId = 0;
+ nextSlaveId = 0;
+ nextOfferId = 0;
- addFramework(framework);
- break;
- }
+ allocatorType = conf.get("allocator", "simple");
- case F2M_REREGISTER_FRAMEWORK: {
- FrameworkID fid;
- string name;
- string user;
- ExecutorInfo executorInfo;
- int32_t generation;
-
- tie(fid, name, user, executorInfo, generation) =
- unpack<F2M_REREGISTER_FRAMEWORK>(body());
-
- if (executorInfo.uri == "") {
- LOG(INFO) << "Framework " << fid << " re-registering "
- << "without an executor URI";
- send(from(), pack<M2F_ERROR>(1, "No executor URI given"));
- break;
- }
+ // Start all the statistics at 0.
+ statistics.launched_tasks = 0;
+ statistics.finished_tasks = 0;
+ statistics.killed_tasks = 0;
+ statistics.failed_tasks = 0;
+ statistics.lost_tasks = 0;
+ statistics.valid_status_updates = 0;
+ statistics.invalid_status_updates = 0;
+ statistics.valid_framework_messages = 0;
+ statistics.invalid_framework_messages = 0;
- if (fid == "") {
- LOG(ERROR) << "Framework re-registering without an id!";
- send(from(), pack<M2F_ERROR>(1, "Missing framework id"));
- break;
- }
+ // Install handler functions for certain messages.
+ install(NEW_MASTER_DETECTED, &Master::newMasterDetected,
+ &NewMasterDetectedMessage::pid);
- LOG(INFO) << "Re-registering framework " << fid << " at " << from();
+ install(NO_MASTER_DETECTED, &Master::noMasterDetected);
- if (frameworks.count(fid) > 0) {
- // Using the "generation" of the scheduler allows us to keep a
- // scheduler that got partitioned but didn't die (in ZooKeeper
- // speak this means didn't lose their session) and then
- // eventually tried to connect to this master even though
- // another instance of their scheduler has reconnected. This
- // might not be an issue in the future when the
- // master/allocator launches the scheduler can get restarted
- // (if necessary) by the master and the master will always
- // know which scheduler is the correct one.
- if (generation == 0) {
- LOG(INFO) << "Framework " << fid << " failed over";
- failoverFramework(frameworks[fid], from());
- // TODO: Should we check whether the new scheduler has given
- // us a different framework name, user name or executor info?
- } else {
- LOG(INFO) << "Framework " << fid << " re-registering with an "
- << "already used id and not failing over!";
- send(from(), pack<M2F_ERROR>(1, "Framework id in use"));
- break;
- }
- } else {
- // We don't have a framework with this ID, so we must be a newly
- // elected Mesos master to which either an existing scheduler or a
- // failed-over one is connecting. Create a Framework object and add
- // any tasks it has that have been reported by reconnecting slaves.
- Framework *framework = new Framework(from(), fid, elapsed());
- framework->name = name;
- framework->user = user;
- framework->executorInfo = executorInfo;
- addFramework(framework);
- // Add any running tasks reported by slaves for this framework.
- foreachpair (SlaveID slaveId, Slave *slave, slaves) {
- foreachpair (_, Task *task, slave->tasks) {
- if (framework->id == task->frameworkId) {
- framework->addTask(task);
- }
- }
- }
- }
+ install(F2M_REGISTER_FRAMEWORK, &Master::registerFramework,
+ &RegisterFrameworkMessage::framework);
- CHECK(frameworks.find(fid) != frameworks.end());
+ install(F2M_REREGISTER_FRAMEWORK, &Master::reregisterFramework,
+ &ReregisterFrameworkMessage::framework_id,
+ &ReregisterFrameworkMessage::framework,
+ &ReregisterFrameworkMessage::generation);
- // Broadcast the new framework pid to all the slaves. We have to
- // broadcast because an executor might be running on a slave but
- // it currently isn't running any tasks. This could be a
- // potential scalability issue ...
- foreachpair (_, Slave *slave, slaves) {
- send(slave->pid, pack<M2S_UPDATE_FRAMEWORK_PID>(fid, from()));
- }
+ install(F2M_UNREGISTER_FRAMEWORK, &Master::unregisterFramework,
+ &UnregisterFrameworkMessage::framework_id);
- break;
- }
+ install(F2M_RESOURCE_OFFER_REPLY, &Master::resourceOfferReply,
+ &ResourceOfferReplyMessage::framework_id,
+ &ResourceOfferReplyMessage::offer_id,
+ &ResourceOfferReplyMessage::tasks,
+ &ResourceOfferReplyMessage::params);
- case F2M_UNREGISTER_FRAMEWORK: {
- FrameworkID fid;
- tie(fid) = unpack<F2M_UNREGISTER_FRAMEWORK>(body());
- LOG(INFO) << "Asked to unregister framework " << fid;
- Framework *framework = lookupFramework(fid);
- if (framework != NULL && framework->pid == from())
- removeFramework(framework);
- else
- LOG(WARNING) << "Non-authoratative PID attempting framework "
- << "unregistration ... ignoring";
- break;
+ install(F2M_REVIVE_OFFERS, &Master::reviveOffers,
+ &ReviveOffersMessage::framework_id);
+
+ install(F2M_KILL_TASK, &Master::killTask,
+ &KillTaskMessage::framework_id,
+ &KillTaskMessage::task_id);
+
+ install(F2M_FRAMEWORK_MESSAGE, &Master::schedulerMessage,
+ &FrameworkMessageMessage::slave_id,
+ &FrameworkMessageMessage::framework_id,
+ &FrameworkMessageMessage::executor_id,
+ &FrameworkMessageMessage::data);
+
+ install(F2M_STATUS_UPDATE_ACK, &Master::statusUpdateAck,
+ &StatusUpdateAckMessage::framework_id,
+ &StatusUpdateAckMessage::task_id,
+ &StatusUpdateAckMessage::slave_id);
+
+ install(S2M_REGISTER_SLAVE, &Master::registerSlave,
+ &RegisterSlaveMessage::slave);
+
+ install(S2M_REREGISTER_SLAVE, &Master::reregisterSlave,
+ &ReregisterSlaveMessage::slave_id,
+ &ReregisterSlaveMessage::slave,
+ &ReregisterSlaveMessage::tasks);
+
+ install(S2M_UNREGISTER_SLAVE, &Master::unregisterSlave,
+ &UnregisterSlaveMessage::slave_id);
+
+ install(S2M_STATUS_UPDATE, &Master::statusUpdate,
+ &StatusUpdateMessage::framework_id,
+ &StatusUpdateMessage::status);
+
+ install(S2M_FRAMEWORK_MESSAGE, &Master::executorMessage,
+ &FrameworkMessageMessage::slave_id,
+ &FrameworkMessageMessage::framework_id,
+ &FrameworkMessageMessage::executor_id,
+ &FrameworkMessageMessage::data);
+
+ install(S2M_EXITED_EXECUTOR, &Master::exitedExecutor,
+ &ExitedExecutorMessage::slave_id,
+ &ExitedExecutorMessage::framework_id,
+ &ExitedExecutorMessage::executor_id,
+ &ExitedExecutorMessage::result);
+
+ install(process::EXITED, &Master::exited);
+
+ // Install HTTP request handlers.
+ Process<Master>::install("vars", &Master::vars);
+ Process<Master>::install("stats", &Master::stats);
+}
+
+
+void Master::newMasterDetected(const string& pid)
+{
+ // Check and see if we are (1) still waiting to be the active
+ // master, (2) newly active master, (3) no longer active master,
+ // or (4) still active master.
+
+ UPID master(pid);
+
+ if (master != self() && !active) {
+ LOG(INFO) << "Waiting to be master!";
+ } else if (master == self() && !active) {
+ LOG(INFO) << "Acting as master!";
+ active = true;
+ } else if (master != self() && active) {
+ LOG(FATAL) << "No longer active master ... committing suicide!";
+ } else if (master == self() && active) {
+ LOG(INFO) << "Still acting as master!";
+ }
+}
+
+
+void Master::noMasterDetected()
+{
+ if (active) {
+ LOG(FATAL) << "No longer active master ... committing suicide!";
+ } else {
+ LOG(FATAL) << "No master detected (?) ... committing suicide!";
+ }
+}
+
+
+void Master::registerFramework(const FrameworkInfo& frameworkInfo)
+{
+ Framework* framework =
+ new Framework(frameworkInfo, newFrameworkId(), from(), elapsed());
+
+ LOG(INFO) << "Registering " << framework << " at " << from();
+
+ if (framework->info.executor().uri() == "") {
+ LOG(INFO) << framework << " registering without an executor URI";
+ MSG<M2F_ERROR> out;
+ out.set_code(1);
+ out.set_message("No executor URI given");
+ send(from(), out);
+ delete framework;
+ } else {
+ bool rootSubmissions = conf.get<bool>("root_submissions", true);
+ if (framework->info.user() == "root" && rootSubmissions == false) {
+ LOG(INFO) << framework << " registering as root, but "
+ << "root submissions are disabled on this cluster";
+ MSG<M2F_ERROR> out;
+ out.set_code(1);
+ out.set_message("User 'root' is not allowed to run frameworks");
+ send(from(), out);
+ delete framework;
}
+ }
+
+ addFramework(framework);
+}
+
+
+void Master::reregisterFramework(const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ int32_t generation)
+{
+ if (frameworkId == "") {
+ LOG(ERROR) << "Framework re-registering without an id!";
+ MSG<M2F_ERROR> out;
+ out.set_code(1);
+ out.set_message("Missing framework id");
+ send(from(), out);
+ } else if (frameworkInfo.executor().uri() == "") {
+ LOG(INFO) << "Framework " << frameworkId << " re-registering "
+ << "without an executor URI";
+ MSG<M2F_ERROR> out;
+ out.set_code(1);
+ out.set_message("No executor URI given");
+ send(from(), out);
+ } else {
+ LOG(INFO) << "Re-registering framework " << frameworkId
+ << " at " << from();
+
+ if (frameworks.count(frameworkId) > 0) {
+ // Using the "generation" of the scheduler allows us to keep a
+ // scheduler that got partitioned but didn't die (in ZooKeeper
+ // speak this means didn't lose their session) and then
+ // eventually tried to connect to this master even though
+ // another instance of their scheduler has reconnected. This
+ // might not be an issue in the future when the
+ // master/allocator launches the scheduler can get restarted
+ // (if necessary) by the master and the master will always
+ // know which scheduler is the correct one.
+ if (generation == 0) {
+ LOG(INFO) << "Framework " << frameworkId << " failed over";
+ failoverFramework(frameworks[frameworkId], from());
+ // TODO: Should we check whether the new scheduler has given
+ // us a different framework name, user name or executor info?
+ } else {
+ LOG(INFO) << "Framework " << frameworkId
+ << " re-registering with an already used id "
+ << " and not failing over!";
+ MSG<M2F_ERROR> out;
+ out.set_code(1);
+ out.set_message("Framework id in use");
+ send(from(), out);
+ return;
+ }
+ } else {
+ // We don't have a framework with this ID, so we must be a newly
+ // elected Mesos master to which either an existing scheduler or a
+ // failed-over one is connecting. Create a Framework object and add
+ // any tasks it has that have been reported by reconnecting slaves.
+ Framework* framework =
+ new Framework(frameworkInfo, frameworkId, from(), elapsed());
- case F2M_SLOT_OFFER_REPLY: {
- FrameworkID fid;
- OfferID oid;
- vector<TaskDescription> tasks;
- Params params;
- tie(fid, oid, tasks, params) = unpack<F2M_SLOT_OFFER_REPLY>(body());
- Framework *framework = lookupFramework(fid);
- if (framework != NULL) {
- SlotOffer *offer = lookupSlotOffer(oid);
- if (offer != NULL) {
- processOfferReply(offer, tasks, params);
- } else {
- // The slot offer is gone, meaning that we rescinded it or that
- // the slave was lost; immediately report any tasks in it as lost
- foreach (const TaskDescription &t, tasks) {
- send(framework->pid,
- pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
+ // TODO(benh): Check for root submissions like above!
+
+ addFramework(framework);
+ // Add any running tasks reported by slaves for this framework.
+ foreachpair (const SlaveID& slaveId, Slave* slave, slaves) {
+ foreachpair (_, Task* task, slave->tasks) {
+ if (framework->frameworkId == task->framework_id()) {
+ framework->addTask(task);
}
}
}
- break;
}
- case F2M_REVIVE_OFFERS: {
- FrameworkID fid;
- tie(fid) = unpack<F2M_REVIVE_OFFERS>(body());
- Framework *framework = lookupFramework(fid);
- if (framework != NULL) {
- LOG(INFO) << "Reviving offers for " << framework;
- framework->slaveFilter.clear();
- allocator->offersRevived(framework);
- }
- break;
+ CHECK(frameworks.count(frameworkId) > 0);
+
+ // Broadcast the new framework pid to all the slaves. We have to
+ // broadcast because an executor might be running on a slave but
+ // it currently isn't running any tasks. This could be a
+ // potential scalability issue ...
+ foreachpair (_, Slave* slave, slaves) {
+ MSG<M2S_UPDATE_FRAMEWORK> out;
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.set_pid(from());
+ send(slave->pid, out);
}
+ }
+}
- case F2M_KILL_TASK: {
- FrameworkID fid;
- TaskID tid;
- tie(fid, tid) = unpack<F2M_KILL_TASK>(body());
- Framework *framework = lookupFramework(fid);
- if (framework != NULL) {
- Task *task = framework->lookupTask(tid);
- if (task != NULL) {
- LOG(INFO) << "Asked to kill " << task << " by its framework";
- killTask(task);
- } else {
- LOG(INFO) << "Asked to kill UNKNOWN task by its framework";
- send(framework->pid, pack<M2F_STATUS_UPDATE>(tid, TASK_LOST, ""));
- }
- }
- break;
+
+void Master::unregisterFramework(const FrameworkID& frameworkId)
+{
+ LOG(INFO) << "Asked to unregister framework " << frameworkId;
+
+ Framework* framework = lookupFramework(frameworkId);
+ if (framework != NULL) {
+ if (framework->pid == from()) {
+ removeFramework(framework);
+ } else {
+ LOG(WARNING) << from() << " tried to unregister framework; "
+ << "expecting " << framework->pid;
}
+ }
+}
+
- case F2M_FRAMEWORK_MESSAGE: {
- FrameworkID fid;
- FrameworkMessage message;
- tie(fid, message) = unpack<F2M_FRAMEWORK_MESSAGE>(body());
- Framework *framework = lookupFramework(fid);
- if (framework != NULL) {
- Slave *slave = lookupSlave(message.slaveId);
- if (slave != NULL)
- send(slave->pid, pack<M2S_FRAMEWORK_MESSAGE>(fid, message));
+void Master::resourceOfferReply(const FrameworkID& frameworkId,
+ const OfferID& offerId,
+ const vector<TaskDescription>& tasks,
+ const Params& params)
+{
+ Framework* framework = lookupFramework(frameworkId);
+ if (framework != NULL) {
+ SlotOffer* offer = lookupSlotOffer(offerId);
+ if (offer != NULL) {
+ processOfferReply(offer, tasks, params);
+ } else {
+ // The slot offer is gone, meaning that we rescinded it, it
+ // has already been replied to, or that the slave was lost;
+ // immediately report any tasks in it as lost (it would
+ // probably be better to have better error messages here).
+ foreach (const TaskDescription& task, tasks) {
+ MSG<M2F_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ TaskStatus* status = out.mutable_status();
+ status->mutable_task_id()->MergeFrom(task.task_id());
+ status->mutable_slave_id()->MergeFrom(task.slave_id());
+ status->set_state(TASK_LOST);
+ send(framework->pid, out);
}
- break;
}
+ }
+}
- case S2M_REGISTER_SLAVE: {
- string slaveId = masterId + "-" + lexical_cast<string>(nextSlaveId++);
- Slave *slave = new Slave(from(), slaveId, elapsed());
- tie(slave->hostname, slave->publicDns, slave->resources) =
- unpack<S2M_REGISTER_SLAVE>(body());
- LOG(INFO) << "Registering " << slave << " at " << slave->pid;
- slaves[slave->id] = slave;
- pidToSid[slave->pid] = slave->id;
- link(slave->pid);
- send(slave->pid,
- pack<M2S_REGISTER_REPLY>(slave->id, HEARTBEAT_INTERVAL));
- allocator->slaveAdded(slave);
- break;
- }
- case S2M_REREGISTER_SLAVE: {
- Slave *slave = new Slave(from(), "", elapsed());
- vector<Task> tasks;
- tie(slave->id, slave->hostname, slave->publicDns,
- slave->resources, tasks) = unpack<S2M_REREGISTER_SLAVE>(body());
-
- if (slave->id == "") {
- slave->id = masterId + "-" + lexical_cast<string>(nextSlaveId++);
- LOG(ERROR) << "Slave re-registered without a SlaveID, "
- << "generating a new id for it.";
- }
+void Master::reviveOffers(const FrameworkID& frameworkId)
+{
+ Framework* framework = lookupFramework(frameworkId);
+ if (framework != NULL) {
+ LOG(INFO) << "Reviving offers for " << framework;
+ framework->slaveFilter.clear();
+ allocator->offersRevived(framework);
+ }
+}
- LOG(INFO) << "Re-registering " << slave << " at " << slave->pid;
- slaves[slave->id] = slave;
- pidToSid[slave->pid] = slave->id;
- link(slave->pid);
- send(slave->pid,
- pack<M2S_REREGISTER_REPLY>(slave->id, HEARTBEAT_INTERVAL));
-
- allocator->slaveAdded(slave);
-
- foreach (const Task &t, tasks) {
- Task *task = new Task(t);
- slave->addTask(task);
-
- // Tell this slave the current framework pid for this task.
- Framework *framework = lookupFramework(task->frameworkId);
- if (framework != NULL) {
- framework->addTask(task);
- send(slave->pid, pack<M2S_UPDATE_FRAMEWORK_PID>(framework->id,
- framework->pid));
- }
- }
- // TODO(benh|alig): We should put a timeout on how long we keep
- // tasks running that never have frameworks reregister that
- // claim them.
+void Master::killTask(const FrameworkID& frameworkId,
+ const TaskID& taskId)
+{
+ LOG(INFO) << "Asked to kill task " << taskId
+ << " of framework " << frameworkId;
- break;
- }
+ Framework* framework = lookupFramework(frameworkId);
+ if (framework != NULL) {
+ Task* task = framework->lookupTask(taskId);
+ if (task != NULL) {
+ Slave* slave = lookupSlave(task->slave_id());
+ CHECK(slave != NULL);
- case S2M_UNREGISTER_SLAVE: {
- SlaveID sid;
- tie(sid) = unpack<S2M_UNREGISTER_SLAVE>(body());
- LOG(INFO) << "Asked to unregister slave " << sid;
- Slave *slave = lookupSlave(sid);
- if (slave != NULL)
- removeSlave(slave);
- break;
+ LOG(INFO) << "Telling slave " << slave->slaveId
+ << " to kill task " << taskId
+ << " of framework " << frameworkId;
+
+ MSG<M2S_KILL_TASK> out;
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.mutable_task_id()->MergeFrom(taskId);
+ send(slave->pid, out);
+ } else {
+ LOG(WARNING) << "Cannot kill task " << taskId
+ << " of framework " << frameworkId
+ << " because it cannot be found";
+ MSG<M2F_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ TaskStatus *status = out.mutable_status();
+ status->mutable_task_id()->MergeFrom(taskId);
+ status->mutable_slave_id()->set_value("UNKNOWN");
+ status->set_state(TASK_LOST);
+ send(framework->pid, out);
}
+ }
+}
- case S2M_STATUS_UPDATE: {
- SlaveID sid;
- FrameworkID fid;
- TaskID tid;
- TaskState state;
- string data;
- tie(sid, fid, tid, state, data) = unpack<S2M_STATUS_UPDATE>(body());
-
- if (Slave *slave = lookupSlave(sid)) {
- if (Framework *framework = lookupFramework(fid)) {
- // Pass on the (transformed) status update to the framework.
- forward(framework->pid, pack<M2F_STATUS_UPDATE>(tid, state, data));
- if (duplicate()) {
- LOG(WARNING) << "Locally ignoring duplicate message with id:" << seq();
- break;
- }
- // Update the task state locally.
- Task *task = slave->lookupTask(fid, tid);
- if (task != NULL) {
- LOG(INFO) << "Status update: " << task << " is in state " << state;
- task->state = state;
- // Remove the task if it finished or failed
- if (state == TASK_FINISHED || state == TASK_FAILED ||
- state == TASK_KILLED || state == TASK_LOST) {
- LOG(INFO) << "Removing " << task << " because it's done";
- removeTask(task, TRR_TASK_ENDED);
- }
- }
- } else {
- LOG(ERROR) << "S2M_STATUS_UPDATE error: couldn't lookup "
- << "framework id " << fid;
- }
- } else {
- LOG(ERROR) << "S2M_STATUS_UPDATE error: couldn't lookup slave id "
- << sid;
- }
- break;
+
+void Master::schedulerMessage(const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const string& data)
+{
+ Framework* framework = lookupFramework(frameworkId);
+ if (framework != NULL) {
+ Slave* slave = lookupSlave(slaveId);
+ if (slave != NULL) {
+ LOG(INFO) << "Sending framework message for framework "
+ << frameworkId << " to slave " << slaveId;
+ MSG<M2S_FRAMEWORK_MESSAGE> out;
+ out.mutable_slave_id()->MergeFrom(slaveId);
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.mutable_executor_id()->MergeFrom(executorId);
+ out.set_data(data);
+ send(slave->pid, out);
+ } else {
+ LOG(WARNING) << "Cannot send framework message for framework "
+ << frameworkId << " to slave " << slaveId
+ << " because slave does not exist";
}
+ } else {
+ LOG(WARNING) << "Cannot send framework message for framework "
+ << frameworkId << " to slave " << slaveId
+ << " because framework does not exist";
+ }
+}
- case S2M_FRAMEWORK_MESSAGE: {
- SlaveID sid;
- FrameworkID fid;
- FrameworkMessage message;
- tie(sid, fid, message) = unpack<S2M_FRAMEWORK_MESSAGE>(body());
- Slave *slave = lookupSlave(sid);
- if (slave != NULL) {
- Framework *framework = lookupFramework(fid);
- if (framework != NULL)
- send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
- }
- break;
+
+void Master::statusUpdateAck(const FrameworkID& frameworkId,
+ const TaskID& taskId,
+ const SlaveID& slaveId)
+{
+ Framework* framework = lookupFramework(frameworkId);
+ if (framework != NULL) {
+ Slave* slave = lookupSlave(slaveId);
+ if (slave != NULL) {
+ LOG(INFO) << "Sending slave " << slaveId
+ << " status update acknowledgement for task " << taskId
+ << " of framework " << frameworkId;
+ MSG<M2S_STATUS_UPDATE_ACK> out;
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.mutable_slave_id()->MergeFrom(slaveId);
+ out.mutable_task_id()->MergeFrom(taskId);
+ send(slave->pid, out);
+ } else {
+ LOG(WARNING) << "Cannot tell slave " << slaveId
+ << " of status update acknowledgement for task " << taskId
+ << " of framework " << frameworkId
+ << " because slave does not exist";
}
+ } else {
+ LOG(WARNING) << "Cannot tell slave " << slaveId
+ << " of status update acknowledgement for task " << taskId
+ << " of framework " << frameworkId
+ << " because framework does not exist";
+ }
+}
- case S2M_LOST_EXECUTOR: {
- SlaveID sid;
- FrameworkID fid;
- int32_t status;
- tie(sid, fid, status) = unpack<S2M_LOST_EXECUTOR>(body());
- Slave *slave = lookupSlave(sid);
- if (slave != NULL) {
- Framework *framework = lookupFramework(fid);
- if (framework != NULL) {
- // TODO(benh): Send the framework it's executor's exit status?
- if (status == -1) {
- LOG(INFO) << "Executor on " << slave << " (" << slave->hostname
- << ") disconnected";
- } else {
- LOG(INFO) << "Executor on " << slave << " (" << slave->hostname
- << ") exited with status " << status;
- }
- // Collect all the lost tasks for this framework.
- set<Task*> tasks;
- foreachpair (_, Task* task, framework->tasks)
- if (task->slaveId == slave->id)
- tasks.insert(task);
-
- // Tell the framework they have been lost and remove them.
- foreach (Task* task, tasks) {
- send(framework->pid, pack<M2F_STATUS_UPDATE>(task->id, TASK_LOST,
- task->message));
+void Master::registerSlave(const SlaveInfo& slaveInfo)
+{
+ Slave* slave = new Slave(slaveInfo, newSlaveId(), from(), elapsed());
+
+ LOG(INFO) << "Attempting to register slave " << slave->slaveId
+ << " at " << slave->pid;
+
+ struct SlaveRegistrar
+ {
+ static bool run(Slave* slave, const PID<Master>& master)
+ {
+ // TODO(benh): Do a reverse lookup to ensure IP maps to
+ // hostname, or check credentials of this slave.
+ process::dispatch(master, &Master::addSlave, slave);
+ }
+
+ static bool run(Slave* slave,
+ const PID<Master>& master,
+ const PID<SlavesManager>& slavesManager)
+ {
+ if (!process::call(slavesManager, &SlavesManager::add,
+ slave->info.hostname(), slave->pid.port)) {
+ LOG(WARNING) << "Could not register slave because failed"
+ << " to add it to the slaves maanger";
+ delete slave;
+ return false;
+ }
+
+ return run(slave, master);
+ }
+ };
+
+ // Check whether this slave can be accepted, or if all slaves are accepted.
+ if (slaveHostnamePorts.count(slaveInfo.hostname(), from().port) > 0) {
+ process::run(&SlaveRegistrar::run, slave, self());
+ } else if (conf.get<string>("slaves", "*") == "*") {
+ process::run(&SlaveRegistrar::run, slave, self(), slavesManager->self());
+ } else {
+ LOG(WARNING) << "Cannot register slave at "
+ << slaveInfo.hostname() << ":" << from().port
+ << " because not in allocated set of slaves!";
+ send(from(), process::TERMINATE);
+ }
+}
+
+
+void Master::reregisterSlave(const SlaveID& slaveId,
+ const SlaveInfo& slaveInfo,
+ const vector<Task>& tasks)
+{
+ if (slaveId == "") {
+ LOG(ERROR) << "Slave re-registered without an id!";
+ send(from(), process::TERMINATE);
+ } else {
+ Slave* slave = lookupSlave(slaveId);
+ if (slave != NULL) {
+ // TODO(benh): It's still unclear whether or not
+ // MasterDetector::detectMaster will cause spurious
+ // Slave::newMasterDetected to get invoked even though the
+ // ephemeral znode hasn't changed. If that does happen, the
+ // re-register that the slave is trying to do is just
+ // bogus. Letting it re-register might not be all that bad now,
+ // but maybe in the future it's bad because during that
+ // "disconnected" time it might not have received certain
+ // messages from us (like launching a task), and so until we
+ // have some form of task reconciliation between all the
+ // different components, the safe thing to do is have the slave
+ // restart (kind of defeats the purpose of session expiration
+ // support in ZooKeeper if the spurious calls happen each time).
+ LOG(ERROR) << "Slave at " << from()
+ << " attempted to re-register with an already in use id ("
+ << slaveId << ")";
+ send(from(), process::TERMINATE);
+ } else {
+ Slave* slave = new Slave(slaveInfo, slaveId, from(), elapsed());
+
+ LOG(INFO) << "Attempting to re-register slave " << slave->slaveId
+ << " at " << slave->pid;
+
+ struct SlaveReregistrar
+ {
+ static bool run(Slave* slave,
+ const vector<Task>& tasks,
+ const PID<Master>& master)
+ {
+ // TODO(benh): Do a reverse lookup to ensure IP maps to
+ // hostname, or check credentials of this slave.
+ process::dispatch(master, &Master::readdSlave, slave, tasks);
+ }
- LOG(INFO) << "Removing " << task << " because of lost executor";
- removeTask(task, TRR_EXECUTOR_LOST);
+ static bool run(Slave* slave,
+ const vector<Task>& tasks,
+ const PID<Master>& master,
+ const PID<SlavesManager>& slavesManager)
+ {
+ if (!process::call(slavesManager, &SlavesManager::add,
+ slave->info.hostname(), slave->pid.port)) {
+ LOG(WARNING) << "Could not register slave because failed"
+ << " to add it to the slaves maanger";
+ delete slave;
+ return false;
}
- // TODO(benh): Might we still want something like M2F_EXECUTOR_LOST?
+ return run(slave, tasks, master);
}
- }
- break;
- }
+ };
- case SH2M_HEARTBEAT: {
- SlaveID sid;
- tie(sid) = unpack<SH2M_HEARTBEAT>(body());
- Slave *slave = lookupSlave(sid);
- if (slave != NULL) {
- slave->lastHeartbeat = elapsed();
+ // Check whether this slave can be accepted, or if all slaves are accepted.
+ if (slaveHostnamePorts.count(slaveInfo.hostname(), from().port) > 0) {
+ process::run(&SlaveReregistrar::run, slave, tasks, self());
+ } else if (conf.get<string>("slaves", "*") == "*") {
+ process::run(&SlaveReregistrar::run,
+ slave, tasks, self(), slavesManager->self());
} else {
- LOG(WARNING) << "Received heartbeat for UNKNOWN slave " << sid
- << " from " << from();
+ LOG(WARNING) << "Cannot re-register slave at "
+ << slaveInfo.hostname() << ":" << from().port
+ << " because not in allocated set of slaves!";
+ send(from(), process::TERMINATE);
}
- break;
}
+ }
+}
- case M2M_TIMER_TICK: {
- unordered_map<SlaveID, Slave *> slavesCopy = slaves;
- foreachpair (_, Slave *slave, slavesCopy) {
- if (slave->lastHeartbeat + HEARTBEAT_TIMEOUT <= elapsed()) {
- LOG(INFO) << slave << " missing heartbeats ... considering disconnected";
- removeSlave(slave);
+
+void Master::unregisterSlave(const SlaveID& slaveId)
+{
+ LOG(INFO) << "Asked to unregister slave " << slaveId;
+
+ // TODO(benh): Check that only the slave is asking to unregister?
+
+ Slave* slave = lookupSlave(slaveId);
+ if (slave != NULL) {
+ removeSlave(slave);
+ }
+}
+
+
+void Master::statusUpdate(const FrameworkID& frameworkId,
+ const TaskStatus& status)
+{
+ LOG(INFO) << "Status update: task " << status.task_id()
+ << " of framework " << frameworkId
+ << " is now in state "
+ << TaskState_descriptor()->FindValueByNumber(status.state())->name();
+
+ Slave* slave = lookupSlave(status.slave_id());
+ if (slave != NULL) {
+ Framework* framework = lookupFramework(frameworkId);
+ if (framework != NULL) {
+ // Pass on the (transformed) status update to the framework.
+ MSG<M2F_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.mutable_status()->MergeFrom(status);
+ send(framework->pid, out);
+
+ // Lookup the task and see if we need to update anything locally.
+ Task* task = slave->lookupTask(frameworkId, status.task_id());
+ if (task != NULL) {
+ task->set_state(status.state());
+
+ // Remove the task if necessary, and update statistics.
+ switch (status.state()) {
+ case TASK_FINISHED:
+ statistics.finished_tasks++;
+ removeTask(task, TRR_TASK_ENDED);
+ break;
+ case TASK_FAILED:
+ statistics.failed_tasks++;
+ removeTask(task, TRR_TASK_ENDED);
+ break;
+ case TASK_KILLED:
+ statistics.killed_tasks++;
+ removeTask(task, TRR_TASK_ENDED);
+ break;
+ case TASK_LOST:
+ statistics.lost_tasks++;
+ removeTask(task, TRR_TASK_ENDED);
+ break;
}
+
+ statistics.valid_status_updates++;
+ } else {
+ LOG(WARNING) << "Status update error: couldn't lookup "
+ << "task " << status.task_id();
+ statistics.invalid_status_updates++;
+ }
+ } else {
+ LOG(WARNING) << "Status update error: couldn't lookup "
+ << "framework " << frameworkId;
+ statistics.invalid_status_updates++;
+ }
+ } else {
+ LOG(WARNING) << "Status update error: couldn't lookup slave "
+ << status.slave_id();
+ statistics.invalid_status_updates++;
+ }
+}
+
+
+void Master::executorMessage(const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const string& data)
+{
+ Slave* slave = lookupSlave(slaveId);
+ if (slave != NULL) {
+ Framework* framework = lookupFramework(frameworkId);
+ if (framework != NULL) {
+ LOG(INFO) << "Sending framework message from slave " << slaveId
+ << " to framework " << frameworkId;
+ MSG<M2S_FRAMEWORK_MESSAGE> out;
+ out.mutable_slave_id()->MergeFrom(slaveId);
+ out.mutable_framework_id()->MergeFrom(frameworkId);
+ out.mutable_executor_id()->MergeFrom(executorId);
+ out.set_data(data);
+ send(framework->pid, out);
+
+ statistics.valid_framework_messages++;
+ } else {
+ LOG(WARNING) << "Cannot send framework message from slave "
+ << slaveId << " to framework " << frameworkId
+ << " because framework does not exist";
+ statistics.invalid_framework_messages++;
+ }
+ } else {
+ LOG(WARNING) << "Cannot send framework message from slave "
+ << slaveId << " to framework " << frameworkId
+ << " because slave does not exist";
+ statistics.invalid_framework_messages++;
+ }
+}
+
+
+void Master::exitedExecutor(const SlaveID& slaveId,
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ int32_t result)
+{
+ Slave* slave = lookupSlave(slaveId);
+ if (slave != NULL) {
+ Framework* framework = lookupFramework(frameworkId);
+ if (framework != NULL) {
+ LOG(INFO) << "Executor " << executorId
+ << " of framework " << framework->frameworkId
+ << " on slave " << slave->slaveId
+ << " (" << slave->info.hostname() << ") "
+ << "exited with result " << result;
+
+ // Tell the framework which tasks have been lost.
+ foreachpaircopy (_, Task* task, framework->tasks) {
+ if (task->slave_id() == slave->slaveId &&
+ task->executor_id() == executorId) {
+ MSG<M2F_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(task->framework_id());
+ TaskStatus* status = out.mutable_status();
+ status->mutable_task_id()->MergeFrom(task->task_id());
+ status->mutable_slave_id()->MergeFrom(task->slave_id());
+ status->set_state(TASK_LOST);
+ send(framework->pid, out);
+
+ LOG(INFO) << "Removing task " << task->task_id()
+ << " of framework " << frameworkId
+ << " because of lost executor";
+
+ removeTask(task, TRR_EXECUTOR_LOST);
+ }
}
- // Check which framework filters can be expired.
- foreachpair (_, Framework *framework, frameworks)
- framework->removeExpiredFilters(elapsed());
-
- // Do allocations!
- allocator->timerTick();
-
- // int cnts = 0;
- // foreachpair(_, Framework *framework, frameworks) {
- // VLOG(1) << (cnts++) << " resourceInUse:" << framework->resources;
- // }
- break;
+ // TODO(benh): Send the framework it's executor's exit
+ // status? Or maybe at least have something like
+ // M2F_EXECUTOR_LOST?
}
+ }
+}
+
- case M2M_FRAMEWORK_EXPIRED: {
- FrameworkID fid;
- tie(fid) = unpack<M2M_FRAMEWORK_EXPIRED>(body());
- if (Framework *framework = lookupFramework(fid)) {
- LOG(INFO) << "Framework failover timer expired, removing framework "
- << framework;
- removeFramework(framework);
+void Master::activatedSlaveHostnamePort(const string& hostname, uint16_t port)
+{
+ LOG(INFO) << "Master now considering a slave at "
+ << hostname << ":" << port << " as active";
+ slaveHostnamePorts.insert(hostname, port);
+}
+
+
+void Master::deactivatedSlaveHostnamePort(const string& hostname, uint16_t port)
+{
+ if (slaveHostnamePorts.count(hostname, port) > 0) {
+ // Look for a connected slave and remove it.
+ foreachpair (_, Slave* slave, slaves) {
+ if (slave->info.hostname() == hostname && slave->pid.port == port) {
+ LOG(WARNING) << "Removing slave " << slave->slaveId << " at "
+ << hostname << ":" << port
+ << " because it has been deactivated";
+ send(slave->pid, process::TERMINATE);
+ removeSlave(slave);
+ break;
}
- break;
}
- case PROCESS_EXIT: {
- // TODO(benh): Could we get PROCESS_EXIT from a network partition?
- LOG(INFO) << "Process exited: " << from();
- if (pidToFid.find(from()) != pidToFid.end()) {
- FrameworkID fid = pidToFid[from()];
- if (Framework *framework = lookupFramework(fid)) {
- LOG(INFO) << framework << " disconnected";
-// framework->failoverTimer = new FrameworkFailoverTimer(self(), fid);
-// link(spawn(framework->failoverTimer));
- removeFramework(framework);
- }
- } else if (pidToSid.find(from()) != pidToSid.end()) {
- SlaveID sid = pidToSid[from()];
- if (Slave *slave = lookupSlave(sid)) {
- LOG(INFO) << slave << " disconnected";
- removeSlave(slave);
- }
- } else {
- foreachpair (_, Framework *framework, frameworks) {
- if (framework->failoverTimer != NULL &&
- framework->failoverTimer->self() == from()) {
- LOG(INFO) << "Lost framework failover timer, removing framework "
- << framework;
- removeFramework(framework);
- break;
- }
- }
+ LOG(INFO) << "Master now considering a slave at "
+ << hostname << ":" << port << " as inactive";
+ slaveHostnamePorts.erase(hostname, port);
+ }
+}
+
+
+void Master::timerTick()
+{
+ // Check which framework filters can be expired.
+ foreachpair (_, Framework* framework, frameworks) {
+ framework->removeExpiredFilters(elapsed());
+ }
+
+ // Do allocations!
+ allocator->timerTick();
+}
+
+
+void Master::frameworkExpired(const FrameworkID& frameworkId)
+{
+ Framework* framework = lookupFramework(frameworkId);
+ if (framework != NULL) {
+ LOG(INFO) << "Framework failover timer expired, removing "
+ << framework;
+ removeFramework(framework);
+ }
+}
+
+
+void Master::exited()
+{
+ // TODO(benh): Could we get PROCESS_EXIT from a network partition?
+ LOG(INFO) << "Process exited: " << from();
+ if (pidToFrameworkId.count(from()) > 0) {
+ const FrameworkID& frameworkId = pidToFrameworkId[from()];
+ Framework* framework = lookupFramework(frameworkId);
+ if (framework != NULL) {
+ LOG(INFO) << "Framework " << frameworkId << " disconnected";
+
+ // Stop sending offers here for now.
+ framework->active = false;
+
+ // Remove the framework's slot offers.
+ foreachcopy (SlotOffer* offer, framework->slotOffers) {
+ removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
}
- break;
- }
- case M2M_GET_STATE: {
- send(from(), pack<M2M_GET_STATE_REPLY>(getState()));
- break;
+ framework->failoverTimer =
+ new FrameworkFailoverTimer(self(), frameworkId);
+ link(spawn(framework->failoverTimer));
+// removeFramework(framework);
}
-
- case M2M_SHUTDOWN: {
- LOG(INFO) << "Asked to shut down by " << from();
- foreachpair (_, Slave *slave, slaves)
- send(slave->pid, pack<M2S_SHUTDOWN>());
- return;
+ } else if (pidToSlaveId.count(from()) > 0) {
+ const SlaveID& slaveId = pidToSlaveId[from()];
+ Slave* slave = lookupSlave(slaveId);
+ if (slave != NULL) {
+ LOG(INFO) << slave << " disconnected";
+ removeSlave(slave);
}
-
- default:
- LOG(ERROR) << "Received unknown MSGID " << msgid() << " from " << from();
- break;
+ } else {
+ foreachpair (_, Framework* framework, frameworks) {
+ if (framework->failoverTimer != NULL &&
+ framework->failoverTimer->self() == from()) {
+ LOG(ERROR) << "Bad framework failover timer, removing "
+ << framework;
+ removeFramework(framework);
+ break;
+ }
}
}
}
-OfferID Master::makeOffer(Framework *framework,
+Promise<HttpResponse> Master::vars(const HttpRequest& request)
+{
+ LOG(INFO) << "Request for 'vars'";
+
+ ostringstream out;
+
+ out <<
+ "build_date " << build::DATE << "\n" <<
+ "build_user " << build::USER << "\n" <<
+ "build_flags " << build::FLAGS << "\n" <<
+ "frameworks_count " << frameworks.size() << "\n";
+
+ // Also add the configuration values.
+ foreachpair (const string& key, const string& value, conf.getMap()) {
+ out << key << " " << value << "\n";
+ }
+
+ HttpOKResponse response;
+ response.headers["Content-Type"] = "text/plain";
+ response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+ response.body = out.str().data();
+ return response;
+}
+
+
+Promise<HttpResponse> Master::stats(const HttpRequest& request)
+{
+ LOG(INFO) << "Request for 'stats'";
+
+ ostringstream out;
+
+ out <<
+ "{" <<
+ "\"total_schedulers\":" << frameworks.size() << "," <<
+ "\"active_schedulers\":" << getActiveFrameworks().size() << "," <<
+ "\"activated_slaves\":" << slaveHostnamePorts.size() << "," <<
+ "\"connected_slaves\":" << slaves.size() << "," <<
+ "\"launched_tasks\":" << statistics.launched_tasks << "," <<
+ "\"finished_tasks\":" << statistics.finished_tasks << "," <<
+ "\"killed_tasks\":" << statistics.killed_tasks << "," <<
+ "\"failed_tasks\":" << statistics.failed_tasks << "," <<
+ "\"lost_tasks\":" << statistics.lost_tasks << "," <<
+ "\"valid_status_updates\":" << statistics.valid_status_updates << "," <<
+ "\"invalid_status_updates\":" << statistics.invalid_status_updates << "," <<
+ "\"valid_framework_messages\":" << statistics.valid_framework_messages << "," <<
+ "\"invalid_framework_messages\":" << statistics.invalid_framework_messages <<
+ "}";
+
+ HttpOKResponse response;
+ response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
+ response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
+ response.body = out.str().data();
+ return response;
+}
+
+
+OfferID Master::makeOffer(Framework* framework,
const vector<SlaveResources>& resources)
{
- OfferID oid = masterId + "-" + lexical_cast<string>(nextSlotOfferId++);
+ const OfferID& offerId = newOfferId();
+
+ SlotOffer* offer = new SlotOffer(offerId, framework->frameworkId, resources);
- SlotOffer *offer = new SlotOffer(oid, framework->id, resources);
- slotOffers[offer->id] = offer;
+ slotOffers[offer->offerId] = offer;
framework->addOffer(offer);
+
+ // Update the resource information within each of the slave objects. Gross!
foreach (const SlaveResources& r, resources) {
r.slave->slotOffers.insert(offer);
r.slave->resourcesOffered += r.resources;
}
- LOG(INFO) << "Sending " << offer << " to " << framework;
- vector<SlaveOffer> offers;
- map<SlaveID, PID> pids;
+
+ LOG(INFO) << "Sending offer " << offer->offerId
+ << " to framework " << framework->frameworkId;
+
+ MSG<M2F_RESOURCE_OFFER> out;
+ out.mutable_offer_id()->MergeFrom(offerId);
+
foreach (const SlaveResources& r, resources) {
- Params params;
- params.set("cpus", r.resources.cpus);
- params.set("mem", r.resources.mem);
- SlaveOffer offer(r.slave->id, r.slave->hostname, params.getMap());
- offers.push_back(offer);
- pids[r.slave->id] = r.slave->pid;
+ SlaveOffer* offer = out.add_offers();
+ offer->mutable_slave_id()->MergeFrom(r.slave->slaveId);
+ offer->set_hostname(r.slave->info.hostname());
+ offer->mutable_resources()->MergeFrom(r.resources);
+
+ out.add_pids(r.slave->pid);
}
- send(framework->pid, pack<M2F_SLOT_OFFER>(oid, offers, pids));
- return oid;
+
+ send(framework->pid, out);
+
+ return offerId;
}
// Process a resource offer reply (for a non-cancelled offer) by launching
// the desired tasks (if the offer contains a valid set of tasks) and
-// reporting any unused resources to the allocator
-void Master::processOfferReply(SlotOffer *offer,
- const vector<TaskDescription>& tasks, const Params& params)
+// reporting any unused resources to the allocator.
+void Master::processOfferReply(SlotOffer* offer,
+ const vector<TaskDescription>& tasks,
+ const Params& params)
{
LOG(INFO) << "Received reply for " << offer;
- Framework *framework = lookupFramework(offer->frameworkId);
+ Framework* framework = lookupFramework(offer->frameworkId);
CHECK(framework != NULL);
- // Count resources in the offer
- unordered_map<Slave *, Resources> offerResources;
- foreach (SlaveResources &r, offer->resources) {
- offerResources[r.slave] = r.resources;
- }
-
- // Count resources in the response, and check that its tasks are valid
- unordered_map<Slave *, Resources> responseResources;
- foreach (const TaskDescription &t, tasks) {
- // Check whether this task size is valid
- Params params(t.params);
- Resources res(params.getInt32("cpus", -1),
- params.getInt32("mem", -1));
- if (res.cpus < MIN_CPUS || res.mem < MIN_MEM ||
- res.cpus > MAX_CPUS || res.mem > MAX_MEM) {
- terminateFramework(framework, 0,
- "Invalid task size: " + lexical_cast<string>(res));
+ // Count resources in the offer.
+ unordered_map<Slave*, Resources> resourcesOffered;
+ foreach (const SlaveResources& r, offer->resources) {
+ resourcesOffered[r.slave] = r.resources;
+ }
+
+ // Count used resources and check that its tasks are valid.
+ unordered_map<Slave*, Resources> resourcesUsed;
+ foreach (const TaskDescription& task, tasks) {
+ // Check whether the task is on a valid slave.
+ Slave* slave = lookupSlave(task.slave_id());
+ if (slave == NULL || resourcesOffered.count(slave) == 0) {
+ terminateFramework(framework, 0, "Invalid slave in offer reply");
return;
}
- // Check whether the task is on a valid slave
- Slave *slave = lookupSlave(t.slaveId);
- if (!slave || offerResources.find(slave) == offerResources.end()) {
- terminateFramework(framework, 0, "Invalid slave in offer reply");
+
+ // Check whether or not the resources for the task are valid.
+ // TODO(benh): In the future maybe we can also augment the
+ // protobuf to deal with fragmentation purposes by providing some
+ // sort of minimum amount of resources required per task.
+
+ if (task.resources().size() == 0) {
+ terminateFramework(framework, 0, "Invalid resources for task");
return;
}
- responseResources[slave] += res;
+
+ foreach (const Resource& resource, task.resources()) {
+ if (!Resources::isAllocatable(resource)) {
+ // TODO(benh): Also send back the invalid resources as a string?
+ terminateFramework(framework, 0, "Invalid resources for task");
+ return;
+ }
+ }
+
+ resourcesUsed[slave] += task.resources();
}
- // Check that the total accepted on each slave isn't more than offered
- foreachpair (Slave *s, Resources& respRes, responseResources) {
- Resources &offRes = offerResources[s];
- if (respRes.cpus > offRes.cpus || respRes.mem > offRes.mem) {
+ // Check that the total accepted on each slave isn't more than offered.
+ foreachpair (Slave* slave, const Resources& used, resourcesUsed) {
+ if (!(used <= resourcesOffered[slave])) {
terminateFramework(framework, 0, "Too many resources accepted");
return;
}
}
- // Check that there are no duplicate task IDs
+ // Check that there are no duplicate task IDs.
unordered_set<TaskID> idsInResponse;
- foreach (const TaskDescription &t, tasks) {
- if (framework->tasks.find(t.taskId) != framework->tasks.end() ||
- idsInResponse.find(t.taskId) != idsInResponse.end()) {
- terminateFramework(framework, 0,
- "Duplicate task ID: " + lexical_cast<string>(t.taskId));
+ foreach (const TaskDescription& task, tasks) {
+ if (framework->tasks.count(task.task_id()) > 0 ||
+ idsInResponse.count(task.task_id()) > 0) {
+ terminateFramework(framework, 0, "Duplicate task ID: " +
+ lexical_cast<string>(task.task_id()));
return;
}
- idsInResponse.insert(t.taskId);
+ idsInResponse.insert(task.task_id());
}
- // Launch the tasks in the response
- foreach (const TaskDescription &t, tasks) {
- launchTask(framework, t);
- }
-
- // If there are resources left on some slaves, add filters for them
- vector<SlaveResources> resourcesLeft;
- int timeout = params.getInt32("timeout", DEFAULT_REFUSAL_TIMEOUT);
- double expiry = (timeout == -1) ? 0 : elapsed() + timeout;
- foreachpair (Slave *s, Resources offRes, offerResources) {
- Resources respRes = responseResources[s];
- Resources left = offRes - respRes;
- if (left.cpus > 0 || left.mem > 0) {
- resourcesLeft.push_back(SlaveResources(s, left));
- }
- if (timeout != 0 && respRes.cpus == 0 && respRes.mem == 0) {
- LOG(INFO) << "Adding filter on " << s << " to " << framework
- << " for " << timeout << " seconds";
- framework->slaveFilter[s] = expiry;
- }
+ // Launch the tasks in the response.
+ foreach (const TaskDescription& task, tasks) {
+ launchTask(framework, task);
}
-
- // Return the resources left to the allocator
- removeSlotOffer(offer, ORR_FRAMEWORK_REPLIED, resourcesLeft);
-}
+ // Get out the timeout for left over resources (if exists), and use
+ // that to calculate the expiry timeout.
+ int timeout = DEFAULT_REFUSAL_TIMEOUT;
-void Master::launchTask(Framework *framework, const TaskDescription& t)
-{
- Params params(t.params);
- Resources res(params.getInt32("cpus", -1),
- params.getInt32("mem", -1));
+ for (int i = 0; i < params.param_size(); i++) {
+ if (params.param(i).key() == "timeout") {
+ timeout = lexical_cast<int>(params.param(i).value());
+ break;
+ }
+ }
- // The invariant right now is that launchTask is called only for
- // TaskDescriptions where the slave is still valid (see the code
- // above in processOfferReply).
- Slave *slave = lookupSlave(t.slaveId);
- CHECK(slave != NULL);
+ double expiry = (timeout == -1) ? 0 : elapsed() + timeout;
- Task *task = new Task(t.taskId, framework->id, res, TASK_STARTING,
- t.name, "", slave->id);
+ // Now check for unused resources on slaves and add filters for them.
+ vector<SlaveResources> resourcesUnused;
- framework->addTask(task);
- slave->addTask(task);
+ foreachpair (Slave* slave, const Resources& offered, resourcesOffered) {
+ Resources used = resourcesUsed[slave];
+ Resources unused = offered - used;
- allocator->taskAdded(task);
+ CHECK(used == used.allocatable());
- LOG(INFO) << "Launching " << task << " on " << slave;
- send(slave->pid, pack<M2S_RUN_TASK>(
- framework->id, t.taskId, framework->name, framework->user,
- framework->executorInfo, t.name, t.arg, t.params, framework->pid));
-}
+ Resources allocatable = unused.allocatable();
+ if (allocatable.size() > 0) {
+ resourcesUnused.push_back(SlaveResources(slave, allocatable));
+ }
-void Master::rescindOffer(SlotOffer *offer)
-{
- removeSlotOffer(offer, ORR_OFFER_RESCINDED, offer->resources);
+ // Only add a filter on a slave if none of the resources are used.
+ if (timeout != 0 && used.size() == 0) {
+ LOG(INFO) << "Adding filter on " << slave << " to " << framework
+ << " for " << timeout << " seconds";
+ framework->slaveFilter[slave] = expiry;
+ }
+ }
+
+ // Return the resources left to the allocator.
+ removeSlotOffer(offer, ORR_FRAMEWORK_REPLIED, resourcesUnused);
}
-void Master::killTask(Task *task)
+void Master::launchTask(Framework* framework, const TaskDescription& task)
{
- LOG(INFO) << "Killing " << task;
- Framework *framework = lookupFramework(task->frameworkId);
- Slave *slave = lookupSlave(task->slaveId);
- CHECK(framework != NULL);
+ // The invariant right now is that launchTask is called only for
+ // TaskDescriptions where the slave is still valid (see the code
+ // above in processOfferReply).
+ Slave* slave = lookupSlave(task.slave_id());
CHECK(slave != NULL);
- send(slave->pid, pack<M2S_KILL_TASK>(framework->id, task->id));
+
+ // Determine the executor ID for this task.
+ const ExecutorID& executorId = task.has_executor()
+ ? task.executor().executor_id()
+ : framework->info.executor().executor_id();
+
+ Task* t = new Task();
+ t->mutable_framework_id()->MergeFrom(framework->frameworkId);
+ t->mutable_executor_id()->MergeFrom(executorId);
+ t->set_state(TASK_STARTING);
+ t->set_name(task.name());
+ t->mutable_task_id()->MergeFrom(task.task_id());
+ t->mutable_slave_id()->MergeFrom(task.slave_id());
+ t->mutable_resources()->MergeFrom(task.resources());
+
+ framework->addTask(t);
+ slave->addTask(t);
+
+ allocator->taskAdded(t);
+
+ LOG(INFO) << "Launching " << t << " on " << slave;
+
+ MSG<M2S_RUN_TASK> out;
+ out.mutable_framework()->MergeFrom(framework->info);
+ out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+ out.set_pid(framework->pid);
+ out.mutable_task()->MergeFrom(task);
+ send(slave->pid, out);
+
+ statistics.launched_tasks++;
}
// Terminate a framework, sending it a particular error message
// TODO: Make the error codes and messages programmer-friendly
-void Master::terminateFramework(Framework *framework,
+void Master::terminateFramework(Framework* framework,
int32_t code,
- const std::string& message)
+ const string& message)
{
LOG(INFO) << "Terminating " << framework << " due to error: " << message;
- send(framework->pid, pack<M2F_ERROR>(code, message));
+
+ MSG<M2F_ERROR> out;
+ out.set_code(code);
+ out.set_message(message);
+ send(framework->pid, out);
+
removeFramework(framework);
}
// Remove a slot offer (because it was replied or we lost a framework or slave)
-void Master::removeSlotOffer(SlotOffer *offer,
+void Master::removeSlotOffer(SlotOffer* offer,
OfferReturnReason reason,
- const vector<SlaveResources>& resourcesLeft)
+ const vector<SlaveResources>& resourcesUnused)
{
- // Remove from slaves
+ // Remove from slaves.
foreach (SlaveResources& r, offer->resources) {
CHECK(r.slave != NULL);
r.slave->resourcesOffered -= r.resources;
@@ -923,30 +1483,35 @@ void Master::removeSlotOffer(SlotOffer *
Framework *framework = lookupFramework(offer->frameworkId);
CHECK(framework != NULL);
framework->removeOffer(offer);
+
// Also send framework a rescind message unless the reason we are
// removing the offer is that the framework replied to it
if (reason != ORR_FRAMEWORK_REPLIED) {
- send(framework->pid, pack<M2F_RESCIND_OFFER>(offer->id));
+ MSG<M2F_RESCIND_OFFER> out;
+ out.mutable_offer_id()->MergeFrom(offer->offerId);
+ send(framework->pid, out);
}
- // Tell the allocator about the resources freed up
- allocator->offerReturned(offer, reason, resourcesLeft);
+ // Tell the allocator about the unused resources.
+ allocator->offerReturned(offer, reason, resourcesUnused);
// Delete it
- slotOffers.erase(offer->id);
+ slotOffers.erase(offer->offerId);
delete offer;
}
-void Master::addFramework(Framework *framework)
+void Master::addFramework(Framework* framework)
{
- CHECK(frameworks.find(framework->id) == frameworks.end());
+ CHECK(frameworks.count(framework->frameworkId) == 0);
- frameworks[framework->id] = framework;
- pidToFid[framework->pid] = framework->id;
+ frameworks[framework->frameworkId] = framework;
+ pidToFrameworkId[framework->pid] = framework->frameworkId;
link(framework->pid);
- send(framework->pid, pack<M2F_REGISTER_REPLY>(framework->id));
+ MSG<M2F_REGISTER_REPLY> out;
+ out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+ send(framework->pid, out);
allocator->frameworkAdded(framework);
}
@@ -954,51 +1519,68 @@ void Master::addFramework(Framework *fra
// Replace the scheduler for a framework with a new process ID, in the
// event of a scheduler failover.
-void Master::failoverFramework(Framework *framework, const PID &newPid)
+void Master::failoverFramework(Framework* framework, const UPID& newPid)
{
- PID oldPid = framework->pid;
+ const UPID& oldPid = framework->pid;
- // Remove the framework's slot offers.
+ // Remove the framework's slot offers (if they weren't removed before)..
// TODO(benh): Consider just reoffering these to the new framework.
- unordered_set<SlotOffer *> slotOffersCopy = framework->slotOffers;
- foreach (SlotOffer* offer, slotOffersCopy) {
+ foreachcopy (SlotOffer* offer, framework->slotOffers) {
removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
}
- send(oldPid, pack<M2F_ERROR>(1, "Framework failover"));
+ MSG<M2F_ERROR> out;
+ out.set_code(1);
+ out.set_message("Framework failover");
+ send(oldPid, out);
+
+ // TODO(benh): unlink(oldPid);
+ pidToFrameworkId.erase(oldPid);
+ pidToFrameworkId[newPid] = framework->frameworkId;
- // TODO(benh): unlink(old->pid);
- pidToFid.erase(oldPid);
- pidToFid[newPid] = framework->id;
framework->pid = newPid;
link(newPid);
- send(newPid, pack<M2F_REGISTER_REPLY>(framework->id));
+ // Kill the failover timer.
+ if (framework->failoverTimer != NULL) {
+ process::post(framework->failoverTimer->self(), process::TERMINATE);
+ process::wait(framework->failoverTimer->self());
+ delete framework->failoverTimer;
+ framework->failoverTimer = NULL;
+ }
+
+ // Make sure we can get offers again.
+ framework->active = true;
+
+ MSG<M2F_REGISTER_REPLY> reply;
+ reply.mutable_framework_id()->MergeFrom(framework->frameworkId);
+ send(newPid, reply);
}
// Kill all of a framework's tasks, delete the framework object, and
// reschedule slot offers for slots that were assigned to this framework
-void Master::removeFramework(Framework *framework)
+void Master::removeFramework(Framework* framework)
{
framework->active = false;
// TODO: Notify allocator that a framework removal is beginning?
// Tell slaves to kill the framework
- foreachpair (_, Slave *slave, slaves)
- send(slave->pid, pack<M2S_KILL_FRAMEWORK>(framework->id));
+ foreachpair (_, Slave *slave, slaves) {
+ MSG<M2S_KILL_FRAMEWORK> out;
+ out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+ send(slave->pid, out);
+ }
// Remove pointers to the framework's tasks in slaves
- unordered_map<TaskID, Task *> tasksCopy = framework->tasks;
- foreachpair (_, Task *task, tasksCopy) {
- Slave *slave = lookupSlave(task->slaveId);
+ foreachpaircopy (_, Task *task, framework->tasks) {
+ Slave *slave = lookupSlave(task->slave_id());
CHECK(slave != NULL);
removeTask(task, TRR_FRAMEWORK_LOST);
}
- // Remove the framework's slot offers
- unordered_set<SlotOffer *> slotOffersCopy = framework->slotOffers;
- foreach (SlotOffer* offer, slotOffersCopy) {
+ // Remove the framework's slot offers (if they weren't removed before).
+ foreachcopy (SlotOffer* offer, framework->slotOffers) {
removeSlotOffer(offer, ORR_FRAMEWORK_LOST, offer->resources);
}
@@ -1006,25 +1588,88 @@ void Master::removeFramework(Framework *
// failoverFramework needs to be shared!
// TODO(benh): unlink(framework->pid);
- pidToFid.erase(framework->pid);
+ pidToFrameworkId.erase(framework->pid);
- // Delete it
- frameworks.erase(framework->id);
+ // Delete it.
+ frameworks.erase(framework->frameworkId);
allocator->frameworkRemoved(framework);
delete framework;
}
+void Master::addSlave(Slave* slave)
+{
+ CHECK(slave != NULL);
+
+ slaves[slave->slaveId] = slave;
+ pidToSlaveId[slave->pid] = slave->slaveId;
+ link(slave->pid);
+
+ allocator->slaveAdded(slave);
+
+ MSG<M2S_REGISTER_REPLY> out;
+ out.mutable_slave_id()->MergeFrom(slave->slaveId);
+ send(slave->pid, out);
+
+ // TODO(benh):
+ // // Ask the slaves manager to monitor this slave for us.
+ // process::dispatch(slavesManager->self(), &SlavesManager::monitor,
+ // slave->pid, slave->info, slave->slaveId);
+
+ // Set up an observer for the slave.
+ slave->observer = new SlaveObserver(slave->pid, slave->info,
+ slave->slaveId, slavesManager->self());
+ process::spawn(slave->observer);
+}
+
+
+void Master::readdSlave(Slave* slave, const vector<Task>& tasks)
+{
+ CHECK(slave != NULL);
+
+ addSlave(slave);
+
+ for (int i = 0; i < tasks.size(); i++) {
+ Task* task = new Task(tasks[i]);
+
+ // Add the task to the slave.
+ slave->addTask(task);
+
+ // Try and add the task to the framework too, but since the
+ // framework might not yet be connected we won't be able to
+ // add them. However, when the framework connects later we
+ // will add them then. We also tell this slave the current
+ // framework pid for this task. Again, we do the same thing
+ // if a framework currently isn't registered.
+ Framework* framework = lookupFramework(task->framework_id());
+ if (framework != NULL) {
+ framework->addTask(task);
+ MSG<M2S_UPDATE_FRAMEWORK> out;
+ out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+ out.set_pid(framework->pid);
+ send(slave->pid, out);
+ } else {
+ // TODO(benh): We should really put a timeout on how long we
+ // keep tasks running on a slave that never have frameworks
+ // reregister and claim them.
+ LOG(WARNING) << "Possibly orphaned task " << task->task_id()
+ << " of framework " << task->framework_id()
+ << " running on slave " << slave->slaveId;
+ }
+ }
+}
+
+
// Lose all of a slave's tasks and delete the slave object
-void Master::removeSlave(Slave *slave)
+void Master::removeSlave(Slave* slave)
{
slave->active = false;
+
// TODO: Notify allocator that a slave removal is beginning?
// Remove pointers to slave's tasks in frameworks, and send status updates
- unordered_map<pair<FrameworkID, TaskID>, Task *> tasksCopy = slave->tasks;
- foreachpair (_, Task *task, tasksCopy) {
- Framework *framework = lookupFramework(task->frameworkId);
+ foreachpaircopy (_, Task* task, slave->tasks) {
+ Framework *framework = lookupFramework(task->framework_id());
// A framework might not actually exist because the master failed
// over and the framework hasn't reconnected. This can be a tricky
// situation for frameworks that want to have high-availability,
@@ -1033,18 +1678,23 @@ void Master::removeSlave(Slave *slave)
// want to do is create a local Framework object to represent that
// framework until it fails over. See the TODO above in
// S2M_REREGISTER_SLAVE.
- if (framework != NULL)
- send(framework->pid, pack<M2F_STATUS_UPDATE>(task->id, TASK_LOST,
- task->message));
+ if (framework != NULL) {
+ MSG<M2F_STATUS_UPDATE> out;
+ out.mutable_framework_id()->MergeFrom(task->framework_id());
+ TaskStatus* status = out.mutable_status();
+ status->mutable_task_id()->MergeFrom(task->task_id());
+ status->mutable_slave_id()->MergeFrom(task->slave_id());
+ status->set_state(TASK_LOST);
+ send(framework->pid, out);
+ }
removeTask(task, TRR_SLAVE_LOST);
}
// Remove slot offers from the slave; this will also rescind them
- unordered_set<SlotOffer *> slotOffersCopy = slave->slotOffers;
- foreach (SlotOffer *offer, slotOffersCopy) {
+ foreachcopy (SlotOffer* offer, slave->slotOffers) {
// Only report resources on slaves other than this one to the allocator
vector<SlaveResources> otherSlaveResources;
- foreach (SlaveResources& r, offer->resources) {
+ foreach (const SlaveResources& r, offer->resources) {
if (r.slave != slave) {
otherSlaveResources.push_back(r);
}
@@ -1053,32 +1703,46 @@ void Master::removeSlave(Slave *slave)
}
// Remove slave from any filters
- foreachpair (_, Framework *framework, frameworks)
+ foreachpair (_, Framework* framework, frameworks) {
framework->slaveFilter.erase(slave);
+ }
// Send lost-slave message to all frameworks (this helps them re-run
// previously finished tasks whose output was on the lost slave)
- foreachpair (_, Framework *framework, frameworks)
- send(framework->pid, pack<M2F_LOST_SLAVE>(slave->id));
+ foreachpair (_, Framework* framework, frameworks) {
+ MSG<M2F_LOST_SLAVE> out;
+ out.mutable_slave_id()->MergeFrom(slave->slaveId);
+ send(framework->pid, out);
+ }
+
+ // TODO(benh):
+ // // Tell the slaves manager to stop monitoring this slave for us.
+ // process::dispatch(slavesManager->self(), &SlavesManager::forget,
+ // slave->pid, slave->info, slave->slaveId);
+
+ // Kill the slave observer.
+ process::post(slave->observer->self(), process::TERMINATE);
+ process::wait(slave->observer->self());
+ delete slave->observer;
// TODO(benh): unlink(slave->pid);
- pidToSid.erase(slave->pid);
+ pidToSlaveId.erase(slave->pid);
// Delete it
- slaves.erase(slave->id);
+ slaves.erase(slave->slaveId);
allocator->slaveRemoved(slave);
delete slave;
}
// Remove a slot offer (because it was replied or we lost a framework or slave)
-void Master::removeTask(Task *task, TaskRemovalReason reason)
+void Master::removeTask(Task* task, TaskRemovalReason reason)
{
- Framework *framework = lookupFramework(task->frameworkId);
- Slave *slave = lookupSlave(task->slaveId);
+ Framework* framework = lookupFramework(task->framework_id());
+ Slave* slave = lookupSlave(task->slave_id());
CHECK(framework != NULL);
CHECK(slave != NULL);
- framework->removeTask(task->id);
+ framework->removeTask(task->task_id());
slave->removeTask(task);
allocator->taskRemoved(task, reason);
delete task;
@@ -1097,14 +1761,31 @@ Allocator* Master::createAllocator()
// and FWID is an increasing integer.
FrameworkID Master::newFrameworkId()
{
- int fwId = nextFrameworkId++;
ostringstream oss;
- oss << masterId << "-" << setw(4) << setfill('0') << fwId;
- return oss.str();
+ oss << masterId << "-" << setw(4) << setfill('0') << nextFrameworkId++;
+ FrameworkID frameworkId;
+ frameworkId.set_value(oss.str());
+ return frameworkId;
+}
+
+
+OfferID Master::newOfferId()
+{
+ OfferID offerId;
+ offerId.set_value(masterId + "-" + lexical_cast<string>(nextOfferId++));
+ return offerId;
+}
+
+
+SlaveID Master::newSlaveId()
+{
+ SlaveID slaveId;
+ slaveId.set_value(masterId + "-" + lexical_cast<string>(nextSlaveId++));
+ return slaveId;
}
-const Params& Master::getConf()
+const Configuration& Master::getConfiguration()
{
return conf;
}