You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/27 08:08:39 UTC
svn commit: r1140024 [3/15] - in /incubator/mesos/trunk: ./ ec2/
ec2/deploy.karmic64/ ec2/deploy.solaris/ frameworks/torque/nexus-hpl/
include/mesos/ src/ src/common/ src/configurator/ src/detector/
src/examples/ src/examples/java/ src/examples/python/...
Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Mon Jun 27 06:08:33 2011
@@ -1,17 +1,18 @@
-#include <iomanip>
#include <fstream>
+#include <iomanip>
#include <sstream>
#include <glog/logging.h>
-#include <google/protobuf/descriptor.h>
-
#include <process/run.hpp>
+#include <process/timer.hpp>
#include "config/config.hpp"
#include "common/build.hpp"
#include "common/date_utils.hpp"
+#include "common/utils.hpp"
+#include "common/uuid.hpp"
#include "allocator.hpp"
#include "allocator_factory.hpp"
@@ -19,116 +20,10 @@
#include "slaves_manager.hpp"
#include "webui.hpp"
-using namespace mesos;
-using namespace mesos::internal;
-using namespace mesos::internal::master;
-
-using boost::bad_lexical_cast;
-using boost::lexical_cast;
-using boost::unordered_map;
-using boost::unordered_set;
-
-using process::HttpOKResponse;
-using process::HttpResponse;
-using process::HttpRequest;
-using process::PID;
-using process::Process;
-using process::Promise;
-using process::UPID;
-
-using std::endl;
-using std::map;
-using std::max;
-using std::ostringstream;
-using std::pair;
-using std::set;
-using std::setfill;
-using std::setw;
using std::string;
using std::vector;
-
-namespace {
-
-// A process that periodically pings the master to check filter
-// expiries, etc.
-class AllocatorTimer : public Process<AllocatorTimer>
-{
-public:
- AllocatorTimer(const PID<Master>& _master) : master(_master) {}
-
-protected:
- virtual void operator () ()
- {
- link(master);
- while (true) {
- receive(1);
- if (name() == process::TIMEOUT) {
- process::dispatch(master, &Master::timerTick);
- } else if (name() == process::EXITED) {
- return;
- }
- }
- }
-
-private:
- const PID<Master> master;
-};
-
-
-// A process that periodically prints frameworks' shares to a file
-class SharesPrinter : public Process<SharesPrinter>
-{
-public:
- SharesPrinter(const PID<Master>& _master) : master(_master) {}
- ~SharesPrinter() {}
-
-protected:
- virtual void operator () ()
- {
- int tick = 0;
-
- std::ofstream file ("/mnt/shares");
- if (!file.is_open()) {
- LOG(FATAL) << "Could not open /mnt/shares";
- }
-
- while (true) {
- pause(1);
-
- state::MasterState* state = call(master, &Master::getState);
-
- uint32_t total_cpus = 0;
- uint32_t total_mem = 0;
-
- foreach (state::Slave* s, state->slaves) {
- total_cpus += s->cpus;
- total_mem += s->mem;
- }
-
- if (state->frameworks.empty()) {
- file << "--------------------------------" << endl;
- } else {
- foreach (state::Framework* f, state->frameworks) {
- double cpu_share = f->cpus / (double) total_cpus;
- double mem_share = f->mem / (double) total_mem;
- double max_share = max(cpu_share, mem_share);
- file << tick << "#" << f->id << "#" << f->name << "#"
- << f->cpus << "#" << f->mem << "#"
- << cpu_share << "#" << mem_share << "#" << max_share << endl;
- }
- }
- delete state;
- tick++;
- }
- file.close();
- }
-
-private:
- const PID<Master> master;
-};
-
-} // namespace {
+using process::wait; // Necessary on some OS's to disambiguate.
namespace mesos { namespace internal { namespace master {
@@ -140,8 +35,12 @@ public:
const SlaveInfo& _slaveInfo,
const SlaveID& _slaveId,
const PID<SlavesManager>& _slavesManager)
- : slave(_slave), slaveInfo(_slaveInfo), slaveId(_slaveId),
- slavesManager(_slavesManager), timeouts(0), pinged(false) {}
+ : slave(_slave),
+ slaveInfo(_slaveInfo),
+ slaveId(_slaveId),
+ slavesManager(_slavesManager),
+ timeouts(0),
+ pinged(false) {}
virtual ~SlaveObserver() {}
@@ -152,31 +51,31 @@ protected:
// we don't hear a pong, increment the number of timeouts from the
// slave and try and send another ping. If we eventually timeout too
// many missed pongs in a row, consider the slave dead.
- send(slave, PING);
+ send(slave, "PING");
pinged = true;
do {
receive(SLAVE_PONG_TIMEOUT);
- if (name() == PONG) {
+ if (name() == "PONG") {
timeouts = 0;
pinged = false;
- } else if (name() == process::TIMEOUT) {
+ } else if (name() == TIMEOUT) {
if (pinged) {
timeouts++;
pinged = false;
}
- send(slave, PING);
+ send(slave, "PING");
pinged = true;
- } else if (name() == process::TERMINATE) {
+ } else if (name() == TERMINATE) {
return;
}
} while (timeouts < MAX_SLAVE_TIMEOUTS);
// Tell the slave manager to deactivate the slave, this will take
// care of updating the master too.
- while (!process::call(slavesManager, &SlavesManager::deactivate,
- slaveInfo.hostname(), slave.port)) {
+ while (!call(slavesManager, &SlavesManager::deactivate,
+ slaveInfo.hostname(), slave.port)) {
LOG(WARNING) << "Slave \"failed\" but can't be deactivated, retrying";
pause(5);
}
@@ -191,18 +90,76 @@ private:
bool pinged;
};
-}}} // namespace mesos { namespace master { namespace internal {
+
+// Performs slave registration asynchronously. There are two means of
+// doing this, one first tries to add this slave to the slaves
+// manager, while the other one simply tells the master to add the
+// slave.
+struct SlaveRegistrar
+{
+ static bool run(Slave* slave, const PID<Master>& master)
+ {
+ // TODO(benh): Do a reverse lookup to ensure IP maps to
+ // hostname, or check credentials of this slave.
+ dispatch(master, &Master::addSlave, slave, false);
+ }
+
+ static bool run(Slave* slave,
+ const PID<Master>& master,
+ const PID<SlavesManager>& slavesManager)
+ {
+ if (!call(slavesManager, &SlavesManager::add,
+ slave->info.hostname(), slave->pid.port)) {
+ LOG(WARNING) << "Could not register slave because failed"
+ << " to add it to the slaves maanger";
+ delete slave;
+ return false;
+ }
+
+ return run(slave, master);
+ }
+};
+
+
+// Performs slave re-registration asynchronously as above.
+struct SlaveReregistrar
+{
+ static bool run(Slave* slave,
+ const vector<Task>& tasks,
+ const PID<Master>& master)
+ {
+ // TODO(benh): Do a reverse lookup to ensure IP maps to
+ // hostname, or check credentials of this slave.
+ dispatch(master, &Master::readdSlave, slave, tasks);
+ }
+
+ static bool run(Slave* slave,
+ const vector<Task>& tasks,
+ const PID<Master>& master,
+ const PID<SlavesManager>& slavesManager)
+ {
+ if (!call(slavesManager, &SlavesManager::add,
+ slave->info.hostname(), slave->pid.port)) {
+ LOG(WARNING) << "Could not register slave because failed"
+ << " to add it to the slaves maanger";
+ delete slave;
+ return false;
+ }
+
+ return run(slave, tasks, master);
+ }
+};
Master::Master()
- : MesosProcess<Master>("master")
+ : ProcessBase("master")
{
initialize();
}
Master::Master(const Configuration& conf)
- : MesosProcess<Master>("master"),
+ : ProcessBase("master"),
conf(conf)
{
initialize();
@@ -213,34 +170,39 @@ Master::~Master()
{
LOG(INFO) << "Shutting down master";
- foreachpaircopy (_, Framework* framework, frameworks) {
+ foreachvalue (Framework* framework, utils::copy(frameworks)) {
removeFramework(framework);
}
- foreachpaircopy (_, Slave* slave, slaves) {
+ foreachvalue (Slave* slave, utils::copy(slaves)) {
removeSlave(slave);
}
- delete allocator;
-
- CHECK(slotOffers.size() == 0);
+ CHECK(offers.size() == 0);
- process::post(slavesManager->self(), process::TERMINATE);
- process::wait(slavesManager->self());
+ terminate(slavesManager);
+ wait(slavesManager);
delete slavesManager;
+
+ delete allocator;
}
void Master::registerOptions(Configurator* configurator)
{
SlavesManager::registerOptions(configurator);
- configurator->addOption<string>("allocator", 'a',
- "Allocation module name",
- "simple");
- configurator->addOption<bool>("root_submissions",
- "Can root submit frameworks?",
- true);
+
+ configurator->addOption<string>(
+ "allocator",
+ 'a',
+ "Allocation module name",
+ "simple");
+
+ configurator->addOption<bool>(
+ "root_submissions",
+ "Can root submit frameworks?",
+ true);
}
@@ -249,7 +211,7 @@ Promise<state::MasterState*> Master::get
state::MasterState* state =
new state::MasterState(build::DATE, build::USER, self());
- foreachpair (_, Slave* s, slaves) {
+ foreachvalue (Slave* s, slaves) {
Resources resources(s->info.resources());
Resource::Scalar cpus;
Resource::Scalar mem;
@@ -259,14 +221,14 @@ Promise<state::MasterState*> Master::get
mem = resources.getScalar("mem", mem);
state::Slave* slave =
- new state::Slave(s->slaveId.value(), s->info.hostname(),
+ new state::Slave(s->id.value(), s->info.hostname(),
s->info.public_hostname(), cpus.value(),
- mem.value(), s->connectTime);
+ mem.value(), s->registeredTime);
state->slaves.push_back(slave);
}
- foreachpair (_, Framework* f, frameworks) {
+ foreachvalue (Framework* f, frameworks) {
Resources resources(f->resources);
Resource::Scalar cpus;
Resource::Scalar mem;
@@ -276,13 +238,13 @@ Promise<state::MasterState*> Master::get
mem = resources.getScalar("mem", mem);
state::Framework* framework =
- new state::Framework(f->frameworkId.value(), f->info.user(),
+ new state::Framework(f->id.value(), f->info.user(),
f->info.name(), f->info.executor().uri(),
- cpus.value(), mem.value(), f->connectTime);
+ cpus.value(), mem.value(), f->registeredTime);
state->frameworks.push_back(framework);
- foreachpair (_, Task* t, f->tasks) {
+ foreachvalue (Task* t, f->tasks) {
Resources resources(t->resources());
Resource::Scalar cpus;
Resource::Scalar mem;
@@ -300,9 +262,9 @@ Promise<state::MasterState*> Master::get
framework->tasks.push_back(task);
}
- foreach (SlotOffer* o, f->slotOffers) {
- state::SlotOffer* offer =
- new state::SlotOffer(o->offerId.value(), o->frameworkId.value());
+ foreach (Offer* o, f->offers) {
+ state::Offer* offer =
+ new state::Offer(o->id.value(), o->frameworkId.value());
foreach (const SlaveResources& r, o->resources) {
Resources resources(r.resources);
@@ -314,7 +276,7 @@ Promise<state::MasterState*> Master::get
mem = resources.getScalar("mem", mem);
state::SlaveResources* sr =
- new state::SlaveResources(r.slave->slaveId.value(),
+ new state::SlaveResources(r.slave->id.value(),
cpus.value(), mem.value());
offer->resources.push_back(sr);
@@ -332,7 +294,7 @@ Promise<state::MasterState*> Master::get
vector<Framework*> Master::getActiveFrameworks()
{
vector <Framework*> result;
- foreachpair(_, Framework* framework, frameworks) {
+ foreachvalue (Framework* framework, frameworks) {
if (framework->active) {
result.push_back(framework);
}
@@ -345,7 +307,7 @@ vector<Framework*> Master::getActiveFram
vector<Slave*> Master::getActiveSlaves()
{
vector <Slave*> result;
- foreachpair(_, Slave* slave, slaves) {
+ foreachvalue (Slave* slave, slaves) {
if (slave->active) {
result.push_back(slave);
}
@@ -354,75 +316,48 @@ vector<Slave*> Master::getActiveSlaves()
}
-Framework* Master::lookupFramework(const FrameworkID& frameworkId)
-{
- if (frameworks.count(frameworkId) > 0) {
- return frameworks[frameworkId];
- } else {
- return NULL;
- }
-}
-
-
-Slave* Master::lookupSlave(const SlaveID& slaveId)
-{
- if (slaves.count(slaveId) > 0) {
- return slaves[slaveId];
- } else {
- return NULL;
- }
-}
-
-
-SlotOffer* Master::lookupSlotOffer(const OfferID& offerId)
-{
- if (slotOffers.count(offerId) > 0) {
- return slotOffers[offerId];
- } else {
- return NULL;
- }
-}
-
-
void Master::operator () ()
{
LOG(INFO) << "Master started at mesos://" << self();
// Don't do anything until we get a master token.
- while (receive() != GOT_MASTER_TOKEN) {
+ while (receive() != GotMasterTokenMessage().GetTypeName()) {
LOG(INFO) << "Oops! We're dropping a message since "
<< "we haven't received an identifier yet!";
}
- MSG<GOT_MASTER_TOKEN> msg;
- msg.ParseFromString(body());
+ GotMasterTokenMessage message;
+ message.ParseFromString(body());
// The master ID is comprised of the current date and some ephemeral
// token (e.g., determined by ZooKeeper).
- masterId = DateUtils::currentDate() + "-" + msg.token();
+ masterId = DateUtils::currentDate() + "-" + message.token();
LOG(INFO) << "Master ID: " << masterId;
// Setup slave manager.
slavesManager = new SlavesManager(conf, self());
- process::spawn(slavesManager);
+ spawn(slavesManager);
// Create the allocator (we do this after the constructor because it
// leaks 'this').
- allocator = createAllocator();
+ string type = conf.get("allocator", "simple");
+ LOG(INFO) << "Creating \"" << type << "\" allocator";
+ allocator = AllocatorFactory::instantiate(type, this);
+
if (!allocator) {
- LOG(FATAL) << "Unrecognized allocator type: " << allocatorType;
+ LOG(FATAL) << "Unrecognized allocator type: " << type;
}
- link(spawn(new AllocatorTimer(self())));
- //link(spawn(new SharesPrinter(self())));
+ // Start our timer ticks.
+ delay(1.0, self(), &Master::timerTick);
while (true) {
serve();
- if (name() == process::TERMINATE) {
+ if (name() == TERMINATE) {
LOG(INFO) << "Asked to terminate by " << from();
- foreachpair (_, Slave* slave, slaves) {
- send(slave->pid, process::TERMINATE);
+ foreachvalue (Slave* slave, slaves) {
+ send(slave->pid, TERMINATE);
}
break;
} else {
@@ -441,90 +376,102 @@ void Master::initialize()
nextSlaveId = 0;
nextOfferId = 0;
- allocatorType = conf.get("allocator", "simple");
-
// Start all the statistics at 0.
- statistics.launched_tasks = 0;
- statistics.finished_tasks = 0;
- statistics.killed_tasks = 0;
- statistics.failed_tasks = 0;
- statistics.lost_tasks = 0;
- statistics.valid_status_updates = 0;
- statistics.invalid_status_updates = 0;
- statistics.valid_framework_messages = 0;
- statistics.invalid_framework_messages = 0;
+ CHECK(TASK_STARTING == TaskState_MIN);
+ CHECK(TASK_LOST == TaskState_MAX);
+ stats.tasks[TASK_STARTING] = 0;
+ stats.tasks[TASK_RUNNING] = 0;
+ stats.tasks[TASK_FINISHED] = 0;
+ stats.tasks[TASK_FAILED] = 0;
+ stats.tasks[TASK_KILLED] = 0;
+ stats.tasks[TASK_LOST] = 0;
+ stats.validStatusUpdates = 0;
+ stats.invalidStatusUpdates = 0;
+ stats.validFrameworkMessages = 0;
+ stats.invalidFrameworkMessages = 0;
startTime = elapsedTime();
// Install handler functions for certain messages.
- install(NEW_MASTER_DETECTED, &Master::newMasterDetected,
- &NewMasterDetectedMessage::pid);
-
- install(NO_MASTER_DETECTED, &Master::noMasterDetected);
+ installProtobufHandler<NewMasterDetectedMessage>(
+ &Master::newMasterDetected,
+ &NewMasterDetectedMessage::pid);
+
+ installProtobufHandler<NoMasterDetectedMessage>(
+ &Master::noMasterDetected);
+
+ installProtobufHandler<RegisterFrameworkMessage>(
+ &Master::registerFramework,
+ &RegisterFrameworkMessage::framework);
+
+ installProtobufHandler<ReregisterFrameworkMessage>(
+ &Master::reregisterFramework,
+ &ReregisterFrameworkMessage::framework_id,
+ &ReregisterFrameworkMessage::framework,
+ &ReregisterFrameworkMessage::generation);
+
+ installProtobufHandler<UnregisterFrameworkMessage>(
+ &Master::unregisterFramework,
+ &UnregisterFrameworkMessage::framework_id);
+
+ installProtobufHandler<ResourceOfferReplyMessage>(
+ &Master::resourceOfferReply,
+ &ResourceOfferReplyMessage::framework_id,
+ &ResourceOfferReplyMessage::offer_id,
+ &ResourceOfferReplyMessage::tasks,
+ &ResourceOfferReplyMessage::params);
+
+ installProtobufHandler<ReviveOffersMessage>(
+ &Master::reviveOffers,
+ &ReviveOffersMessage::framework_id);
+
+ installProtobufHandler<KillTaskMessage>(
+ &Master::killTask,
+ &KillTaskMessage::framework_id,
+ &KillTaskMessage::task_id);
+
+ installProtobufHandler<FrameworkToExecutorMessage>(
+ &Master::schedulerMessage,
+ &FrameworkToExecutorMessage::slave_id,
+ &FrameworkToExecutorMessage::framework_id,
+ &FrameworkToExecutorMessage::executor_id,
+ &FrameworkToExecutorMessage::data);
+
+ installProtobufHandler<RegisterSlaveMessage>(
+ &Master::registerSlave,
+ &RegisterSlaveMessage::slave);
+
+ installProtobufHandler<ReregisterSlaveMessage>(
+ &Master::reregisterSlave,
+ &ReregisterSlaveMessage::slave_id,
+ &ReregisterSlaveMessage::slave,
+ &ReregisterSlaveMessage::tasks);
+
+ installProtobufHandler<UnregisterSlaveMessage>(
+ &Master::unregisterSlave,
+ &UnregisterSlaveMessage::slave_id);
+
+ installProtobufHandler<StatusUpdateMessage>(
+ &Master::statusUpdate,
+ &StatusUpdateMessage::update,
+ &StatusUpdateMessage::pid);
+
+ installProtobufHandler<ExecutorToFrameworkMessage>(
+ &Master::executorMessage,
+ &ExecutorToFrameworkMessage::slave_id,
+ &ExecutorToFrameworkMessage::framework_id,
+ &ExecutorToFrameworkMessage::executor_id,
+ &ExecutorToFrameworkMessage::data);
+
+ installProtobufHandler<ExitedExecutorMessage>(
+ &Master::exitedExecutor,
+ &ExitedExecutorMessage::slave_id,
+ &ExitedExecutorMessage::framework_id,
+ &ExitedExecutorMessage::executor_id,
+ &ExitedExecutorMessage::status);
- install(F2M_REGISTER_FRAMEWORK, &Master::registerFramework,
- &RegisterFrameworkMessage::framework);
-
- install(F2M_REREGISTER_FRAMEWORK, &Master::reregisterFramework,
- &ReregisterFrameworkMessage::framework_id,
- &ReregisterFrameworkMessage::framework,
- &ReregisterFrameworkMessage::generation);
-
- install(F2M_UNREGISTER_FRAMEWORK, &Master::unregisterFramework,
- &UnregisterFrameworkMessage::framework_id);
-
- install(F2M_RESOURCE_OFFER_REPLY, &Master::resourceOfferReply,
- &ResourceOfferReplyMessage::framework_id,
- &ResourceOfferReplyMessage::offer_id,
- &ResourceOfferReplyMessage::tasks,
- &ResourceOfferReplyMessage::params);
-
- install(F2M_REVIVE_OFFERS, &Master::reviveOffers,
- &ReviveOffersMessage::framework_id);
-
- install(F2M_KILL_TASK, &Master::killTask,
- &KillTaskMessage::framework_id,
- &KillTaskMessage::task_id);
-
- install(F2M_FRAMEWORK_MESSAGE, &Master::schedulerMessage,
- &FrameworkMessageMessage::slave_id,
- &FrameworkMessageMessage::framework_id,
- &FrameworkMessageMessage::executor_id,
- &FrameworkMessageMessage::data);
-
- install(F2M_STATUS_UPDATE_ACK, &Master::statusUpdateAck,
- &StatusUpdateAckMessage::framework_id,
- &StatusUpdateAckMessage::task_id,
- &StatusUpdateAckMessage::slave_id);
-
- install(S2M_REGISTER_SLAVE, &Master::registerSlave,
- &RegisterSlaveMessage::slave);
-
- install(S2M_REREGISTER_SLAVE, &Master::reregisterSlave,
- &ReregisterSlaveMessage::slave_id,
- &ReregisterSlaveMessage::slave,
- &ReregisterSlaveMessage::tasks);
-
- install(S2M_UNREGISTER_SLAVE, &Master::unregisterSlave,
- &UnregisterSlaveMessage::slave_id);
-
- install(S2M_STATUS_UPDATE, &Master::statusUpdate,
- &StatusUpdateMessage::framework_id,
- &StatusUpdateMessage::status);
-
- install(S2M_FRAMEWORK_MESSAGE, &Master::executorMessage,
- &FrameworkMessageMessage::slave_id,
- &FrameworkMessageMessage::framework_id,
- &FrameworkMessageMessage::executor_id,
- &FrameworkMessageMessage::data);
-
- install(S2M_EXITED_EXECUTOR, &Master::exitedExecutor,
- &ExitedExecutorMessage::slave_id,
- &ExitedExecutorMessage::framework_id,
- &ExitedExecutorMessage::executor_id,
- &ExitedExecutorMessage::result);
-
- install(process::EXITED, &Master::exited);
+ // Install some message handlers.
+ installMessageHandler(EXITED, &Master::exited);
// Install HTTP request handlers.
installHttpHandler("info.json", &Master::http_info_json);
@@ -536,13 +483,13 @@ void Master::initialize()
}
-void Master::newMasterDetected(const string& pid)
+void Master::newMasterDetected(const UPID& pid)
{
// Check and see if we are (1) still waiting to be the active
// master, (2) newly active master, (3) no longer active master,
// or (4) still active master.
- UPID master(pid);
+ UPID master = pid;
if (master != self() && !active) {
LOG(INFO) << "Waiting to be master!";
@@ -576,20 +523,20 @@ void Master::registerFramework(const Fra
if (framework->info.executor().uri() == "") {
LOG(INFO) << framework << " registering without an executor URI";
- MSG<M2F_ERROR> out;
- out.set_code(1);
- out.set_message("No executor URI given");
- send(from(), out);
+ FrameworkErrorMessage message;
+ message.set_code(1);
+ message.set_message("No executor URI given");
+ send(from(), message);
delete framework;
} else {
bool rootSubmissions = conf.get<bool>("root_submissions", true);
if (framework->info.user() == "root" && rootSubmissions == false) {
LOG(INFO) << framework << " registering as root, but "
<< "root submissions are disabled on this cluster";
- MSG<M2F_ERROR> out;
- out.set_code(1);
- out.set_message("User 'root' is not allowed to run frameworks");
- send(from(), out);
+ FrameworkErrorMessage message;
+ message.set_code(1);
+ message.set_message("User 'root' is not allowed to run frameworks");
+ send(from(), message);
delete framework;
}
}
@@ -604,17 +551,17 @@ void Master::reregisterFramework(const F
{
if (frameworkId == "") {
LOG(ERROR) << "Framework re-registering without an id!";
- MSG<M2F_ERROR> out;
- out.set_code(1);
- out.set_message("Missing framework id");
- send(from(), out);
+ FrameworkErrorMessage message;
+ message.set_code(1);
+ message.set_message("Missing framework id");
+ send(from(), message);
} else if (frameworkInfo.executor().uri() == "") {
LOG(INFO) << "Framework " << frameworkId << " re-registering "
<< "without an executor URI";
- MSG<M2F_ERROR> out;
- out.set_code(1);
- out.set_message("No executor URI given");
- send(from(), out);
+ FrameworkErrorMessage message;
+ message.set_code(1);
+ message.set_message("No executor URI given");
+ send(from(), message);
} else {
LOG(INFO) << "Re-registering framework " << frameworkId
<< " at " << from();
@@ -630,18 +577,18 @@ void Master::reregisterFramework(const F
// (if necessary) by the master and the master will always
// know which scheduler is the correct one.
if (generation == 0) {
- LOG(INFO) << "Framework " << frameworkId << " failed over";
- failoverFramework(frameworks[frameworkId], from());
// TODO: Should we check whether the new scheduler has given
// us a different framework name, user name or executor info?
+ LOG(INFO) << "Framework " << frameworkId << " failed over";
+ failoverFramework(frameworks[frameworkId], from());
} else {
LOG(INFO) << "Framework " << frameworkId
<< " re-registering with an already used id "
<< " and not failing over!";
- MSG<M2F_ERROR> out;
- out.set_code(1);
- out.set_message("Framework id in use");
- send(from(), out);
+ FrameworkErrorMessage message;
+ message.set_code(1);
+ message.set_message("Framework id in use");
+ send(from(), message);
return;
}
} else {
@@ -657,8 +604,8 @@ void Master::reregisterFramework(const F
addFramework(framework);
// Add any running tasks reported by slaves for this framework.
foreachpair (const SlaveID& slaveId, Slave* slave, slaves) {
- foreachpair (_, Task* task, slave->tasks) {
- if (framework->frameworkId == task->framework_id()) {
+ foreachvalue (Task* task, slave->tasks) {
+ if (framework->id == task->framework_id()) {
framework->addTask(task);
}
}
@@ -671,11 +618,11 @@ void Master::reregisterFramework(const F
// broadcast because an executor might be running on a slave but
// it currently isn't running any tasks. This could be a
// potential scalability issue ...
- foreachpair (_, Slave* slave, slaves) {
- MSG<M2S_UPDATE_FRAMEWORK> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.set_pid(from());
- send(slave->pid, out);
+ foreachvalue (Slave* slave, slaves) {
+ UpdateFrameworkMessage message;
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.set_pid(from());
+ send(slave->pid, message);
}
}
}
@@ -685,7 +632,7 @@ void Master::unregisterFramework(const F
{
LOG(INFO) << "Asked to unregister framework " << frameworkId;
- Framework* framework = lookupFramework(frameworkId);
+ Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
if (framework->pid == from()) {
removeFramework(framework);
@@ -702,9 +649,9 @@ void Master::resourceOfferReply(const Fr
const vector<TaskDescription>& tasks,
const Params& params)
{
- Framework* framework = lookupFramework(frameworkId);
+ Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
- SlotOffer* offer = lookupSlotOffer(offerId);
+ Offer* offer = getOffer(offerId);
if (offer != NULL) {
processOfferReply(offer, tasks, params);
} else {
@@ -713,13 +660,17 @@ void Master::resourceOfferReply(const Fr
// immediately report any tasks in it as lost (it would
// probably be better to have better error messages here).
foreach (const TaskDescription& task, tasks) {
- MSG<M2F_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- TaskStatus* status = out.mutable_status();
+ StatusUpdateMessage message;
+ StatusUpdate* update = message.mutable_update();
+ update->mutable_framework_id()->MergeFrom(frameworkId);
+ update->mutable_executor_id()->MergeFrom(task.executor().executor_id());
+ update->mutable_slave_id()->MergeFrom(task.slave_id());
+ TaskStatus* status = update->mutable_status();
status->mutable_task_id()->MergeFrom(task.task_id());
- status->mutable_slave_id()->MergeFrom(task.slave_id());
status->set_state(TASK_LOST);
- send(framework->pid, out);
+ update->set_timestamp(elapsedTime());
+ update->set_uuid(UUID::random().toBytes());
+ send(framework->pid, message);
}
}
}
@@ -728,7 +679,7 @@ void Master::resourceOfferReply(const Fr
void Master::reviveOffers(const FrameworkID& frameworkId)
{
- Framework* framework = lookupFramework(frameworkId);
+ Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
LOG(INFO) << "Reviving offers for " << framework;
framework->slaveFilter.clear();
@@ -743,32 +694,40 @@ void Master::killTask(const FrameworkID&
LOG(INFO) << "Asked to kill task " << taskId
<< " of framework " << frameworkId;
- Framework* framework = lookupFramework(frameworkId);
+ Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
- Task* task = framework->lookupTask(taskId);
+ Task* task = framework->getTask(taskId);
if (task != NULL) {
- Slave* slave = lookupSlave(task->slave_id());
+ Slave* slave = getSlave(task->slave_id());
CHECK(slave != NULL);
- LOG(INFO) << "Telling slave " << slave->slaveId
+ LOG(INFO) << "Telling slave " << slave->id
<< " to kill task " << taskId
<< " of framework " << frameworkId;
- MSG<M2S_KILL_TASK> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_task_id()->MergeFrom(taskId);
- send(slave->pid, out);
+ KillTaskMessage message;
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_task_id()->MergeFrom(taskId);
+ send(slave->pid, message);
} else {
+ // TODO(benh): Once the scheduler has persistance and
+ // high-availability of it's tasks, it will be the one that
+ // determines that this invocation of 'killTask' is silly, and
+ // can just return "locally" (i.e., after hitting only the other
+ // replicas). Unfortunately, it still won't know the slave id.
+
LOG(WARNING) << "Cannot kill task " << taskId
<< " of framework " << frameworkId
<< " because it cannot be found";
- MSG<M2F_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- TaskStatus *status = out.mutable_status();
+ StatusUpdateMessage message;
+ StatusUpdate* update = message.mutable_update();
+ update->mutable_framework_id()->MergeFrom(frameworkId);
+ TaskStatus* status = update->mutable_status();
status->mutable_task_id()->MergeFrom(taskId);
- status->mutable_slave_id()->set_value("UNKNOWN");
status->set_state(TASK_LOST);
- send(framework->pid, out);
+ update->set_timestamp(elapsedTime());
+ update->set_uuid(UUID::random().toBytes());
+ send(framework->pid, message);
}
}
}
@@ -779,58 +738,32 @@ void Master::schedulerMessage(const Slav
const ExecutorID& executorId,
const string& data)
{
- Framework* framework = lookupFramework(frameworkId);
+ Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
- Slave* slave = lookupSlave(slaveId);
+ Slave* slave = getSlave(slaveId);
if (slave != NULL) {
LOG(INFO) << "Sending framework message for framework "
<< frameworkId << " to slave " << slaveId;
- MSG<M2S_FRAMEWORK_MESSAGE> out;
- out.mutable_slave_id()->MergeFrom(slaveId);
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_executor_id()->MergeFrom(executorId);
- out.set_data(data);
- send(slave->pid, out);
+
+ FrameworkToExecutorMessage message;
+ message.mutable_slave_id()->MergeFrom(slaveId);
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_executor_id()->MergeFrom(executorId);
+ message.set_data(data);
+ send(slave->pid, message);
+
+ stats.validFrameworkMessages++;
} else {
LOG(WARNING) << "Cannot send framework message for framework "
<< frameworkId << " to slave " << slaveId
<< " because slave does not exist";
+ stats.invalidFrameworkMessages++;
}
} else {
LOG(WARNING) << "Cannot send framework message for framework "
<< frameworkId << " to slave " << slaveId
<< " because framework does not exist";
- }
-}
-
-
-void Master::statusUpdateAck(const FrameworkID& frameworkId,
- const TaskID& taskId,
- const SlaveID& slaveId)
-{
- Framework* framework = lookupFramework(frameworkId);
- if (framework != NULL) {
- Slave* slave = lookupSlave(slaveId);
- if (slave != NULL) {
- LOG(INFO) << "Sending slave " << slaveId
- << " status update acknowledgement for task " << taskId
- << " of framework " << frameworkId;
- MSG<M2S_STATUS_UPDATE_ACK> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_slave_id()->MergeFrom(slaveId);
- out.mutable_task_id()->MergeFrom(taskId);
- send(slave->pid, out);
- } else {
- LOG(WARNING) << "Cannot tell slave " << slaveId
- << " of status update acknowledgement for task " << taskId
- << " of framework " << frameworkId
- << " because slave does not exist";
- }
- } else {
- LOG(WARNING) << "Cannot tell slave " << slaveId
- << " of status update acknowledgement for task " << taskId
- << " of framework " << frameworkId
- << " because framework does not exist";
+ stats.invalidFrameworkMessages++;
}
}
@@ -839,44 +772,19 @@ void Master::registerSlave(const SlaveIn
{
Slave* slave = new Slave(slaveInfo, newSlaveId(), from(), elapsedTime());
- LOG(INFO) << "Attempting to register slave " << slave->slaveId
+ LOG(INFO) << "Attempting to register slave " << slave->id
<< " at " << slave->pid;
- struct SlaveRegistrar
- {
- static bool run(Slave* slave, const PID<Master>& master)
- {
- // TODO(benh): Do a reverse lookup to ensure IP maps to
- // hostname, or check credentials of this slave.
- process::dispatch(master, &Master::addSlave, slave);
- }
-
- static bool run(Slave* slave,
- const PID<Master>& master,
- const PID<SlavesManager>& slavesManager)
- {
- if (!process::call(slavesManager, &SlavesManager::add,
- slave->info.hostname(), slave->pid.port)) {
- LOG(WARNING) << "Could not register slave because failed"
- << " to add it to the slaves maanger";
- delete slave;
- return false;
- }
-
- return run(slave, master);
- }
- };
-
- // Check whether this slave can be accepted, or if all slaves are accepted.
+ // Checks if this slave, or if all slaves, can be accepted.
if (slaveHostnamePorts.count(slaveInfo.hostname(), from().port) > 0) {
- process::run(&SlaveRegistrar::run, slave, self());
+ run(&SlaveRegistrar::run, slave, self());
} else if (conf.get<string>("slaves", "*") == "*") {
- process::run(&SlaveRegistrar::run, slave, self(), slavesManager->self());
+ run(&SlaveRegistrar::run, slave, self(), slavesManager->self());
} else {
LOG(WARNING) << "Cannot register slave at "
<< slaveInfo.hostname() << ":" << from().port
<< " because not in allocated set of slaves!";
- send(from(), process::TERMINATE);
+ send(from(), TERMINATE);
}
}
@@ -887,9 +795,9 @@ void Master::reregisterSlave(const Slave
{
if (slaveId == "") {
LOG(ERROR) << "Slave re-registered without an id!";
- send(from(), process::TERMINATE);
+ send(from(), TERMINATE);
} else {
- Slave* slave = lookupSlave(slaveId);
+ Slave* slave = getSlave(slaveId);
if (slave != NULL) {
// TODO(benh): It's still unclear whether or not
// MasterDetector::detectMaster will cause spurious
@@ -907,52 +815,24 @@ void Master::reregisterSlave(const Slave
LOG(ERROR) << "Slave at " << from()
<< " attempted to re-register with an already in use id ("
<< slaveId << ")";
- send(from(), process::TERMINATE);
+ send(from(), TERMINATE);
} else {
Slave* slave = new Slave(slaveInfo, slaveId, from(), elapsedTime());
- LOG(INFO) << "Attempting to re-register slave " << slave->slaveId
+ LOG(INFO) << "Attempting to re-register slave " << slave->id
<< " at " << slave->pid;
- struct SlaveReregistrar
- {
- static bool run(Slave* slave,
- const vector<Task>& tasks,
- const PID<Master>& master)
- {
- // TODO(benh): Do a reverse lookup to ensure IP maps to
- // hostname, or check credentials of this slave.
- process::dispatch(master, &Master::readdSlave, slave, tasks);
- }
-
- static bool run(Slave* slave,
- const vector<Task>& tasks,
- const PID<Master>& master,
- const PID<SlavesManager>& slavesManager)
- {
- if (!process::call(slavesManager, &SlavesManager::add,
- slave->info.hostname(), slave->pid.port)) {
- LOG(WARNING) << "Could not register slave because failed"
- << " to add it to the slaves maanger";
- delete slave;
- return false;
- }
-
- return run(slave, tasks, master);
- }
- };
-
- // Check whether this slave can be accepted, or if all slaves are accepted.
+ // Checks if this slave, or if all slaves, can be accepted.
if (slaveHostnamePorts.count(slaveInfo.hostname(), from().port) > 0) {
- process::run(&SlaveReregistrar::run, slave, tasks, self());
+ run(&SlaveReregistrar::run, slave, tasks, self());
} else if (conf.get<string>("slaves", "*") == "*") {
- process::run(&SlaveReregistrar::run,
- slave, tasks, self(), slavesManager->self());
+ run(&SlaveReregistrar::run,
+ slave, tasks, self(), slavesManager->self());
} else {
LOG(WARNING) << "Cannot re-register slave at "
<< slaveInfo.hostname() << ":" << from().port
<< " because not in allocated set of slaves!";
- send(from(), process::TERMINATE);
+ send(from(), TERMINATE);
}
}
}
@@ -965,71 +845,65 @@ void Master::unregisterSlave(const Slave
// TODO(benh): Check that only the slave is asking to unregister?
- Slave* slave = lookupSlave(slaveId);
+ Slave* slave = getSlave(slaveId);
if (slave != NULL) {
removeSlave(slave);
}
}
-void Master::statusUpdate(const FrameworkID& frameworkId,
- const TaskStatus& status)
+void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
{
- LOG(INFO) << "Status update: task " << status.task_id()
- << " of framework " << frameworkId
- << " is now in state "
- << TaskState_descriptor()->FindValueByNumber(status.state())->name();
+ const TaskStatus& status = update.status();
- Slave* slave = lookupSlave(status.slave_id());
+ LOG(INFO) << "Status update from " << from()
+ << ": task " << status.task_id()
+ << " of framework " << update.framework_id()
+ << " is now in state " << status.state();
+
+ Slave* slave = getSlave(update.slave_id());
if (slave != NULL) {
- Framework* framework = lookupFramework(frameworkId);
+ Framework* framework = getFramework(update.framework_id());
if (framework != NULL) {
// Pass on the (transformed) status update to the framework.
- MSG<M2F_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_status()->MergeFrom(status);
- send(framework->pid, out);
+ StatusUpdateMessage message;
+ message.mutable_update()->MergeFrom(update);
+ message.set_pid(pid);
+ send(framework->pid, message);
// Lookup the task and see if we need to update anything locally.
- Task* task = slave->lookupTask(frameworkId, status.task_id());
+ Task* task = slave->getTask(update.framework_id(), status.task_id());
if (task != NULL) {
task->set_state(status.state());
- // Remove the task if necessary, and update statistics.
- switch (status.state()) {
- case TASK_FINISHED:
- statistics.finished_tasks++;
- removeTask(task, TRR_TASK_ENDED);
- break;
- case TASK_FAILED:
- statistics.failed_tasks++;
- removeTask(task, TRR_TASK_ENDED);
- break;
- case TASK_KILLED:
- statistics.killed_tasks++;
- removeTask(task, TRR_TASK_ENDED);
- break;
- case TASK_LOST:
- statistics.lost_tasks++;
- removeTask(task, TRR_TASK_ENDED);
- break;
- }
+ // Handle the task appropriately if it's terminated.
+ if (status.state() == TASK_FINISHED ||
+ status.state() == TASK_FAILED ||
+ status.state() == TASK_KILLED ||
+ status.state() == TASK_LOST) {
+ removeTask(framework, slave, task, TRR_TASK_ENDED);
+ }
+
+ stats.tasks[status.state()]++;
- statistics.valid_status_updates++;
+ stats.validStatusUpdates++;
} else {
- LOG(WARNING) << "Status update error: couldn't lookup "
+ LOG(WARNING) << "Status update from " << from()
+ << ": error, couldn't lookup "
<< "task " << status.task_id();
- statistics.invalid_status_updates++;
+ stats.invalidStatusUpdates++;
}
} else {
- LOG(WARNING) << "Status update error: couldn't lookup "
- << "framework " << frameworkId;
- statistics.invalid_status_updates++;
+ LOG(WARNING) << "Status update from " << from()
+ << ": error, couldn't lookup "
+ << "framework " << update.framework_id();
+ stats.invalidStatusUpdates++;
}
} else {
- LOG(WARNING) << "Status update error: couldn't lookup slave "
- << status.slave_id();
- statistics.invalid_status_updates++;
+ LOG(WARNING) << "Status update from " << from()
+ << ": error, couldn't lookup slave "
+ << update.slave_id();
+ stats.invalidStatusUpdates++;
}
}
@@ -1039,31 +913,31 @@ void Master::executorMessage(const Slave
const ExecutorID& executorId,
const string& data)
{
- Slave* slave = lookupSlave(slaveId);
+ Slave* slave = getSlave(slaveId);
if (slave != NULL) {
- Framework* framework = lookupFramework(frameworkId);
+ Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
LOG(INFO) << "Sending framework message from slave " << slaveId
<< " to framework " << frameworkId;
- MSG<M2S_FRAMEWORK_MESSAGE> out;
- out.mutable_slave_id()->MergeFrom(slaveId);
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_executor_id()->MergeFrom(executorId);
- out.set_data(data);
- send(framework->pid, out);
+ ExecutorToFrameworkMessage message;
+ message.mutable_slave_id()->MergeFrom(slaveId);
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_executor_id()->MergeFrom(executorId);
+ message.set_data(data);
+ send(framework->pid, message);
- statistics.valid_framework_messages++;
+ stats.validFrameworkMessages++;
} else {
LOG(WARNING) << "Cannot send framework message from slave "
<< slaveId << " to framework " << frameworkId
<< " because framework does not exist";
- statistics.invalid_framework_messages++;
+ stats.invalidFrameworkMessages++;
}
} else {
LOG(WARNING) << "Cannot send framework message from slave "
<< slaveId << " to framework " << frameworkId
<< " because slave does not exist";
- statistics.invalid_framework_messages++;
+ stats.invalidFrameworkMessages++;
}
}
@@ -1071,35 +945,53 @@ void Master::executorMessage(const Slave
void Master::exitedExecutor(const SlaveID& slaveId,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
- int32_t result)
+ int32_t status)
{
- Slave* slave = lookupSlave(slaveId);
+ // TODO(benh): Send status updates for the tasks running under this
+ // executor from the slave! Maybe requires adding an extra "reason"
+ // so that people can see that the tasks were lost because of
+
+ Slave* slave = getSlave(slaveId);
if (slave != NULL) {
- Framework* framework = lookupFramework(frameworkId);
+ Framework* framework = getFramework(frameworkId);
if (framework != NULL) {
LOG(INFO) << "Executor " << executorId
- << " of framework " << framework->frameworkId
- << " on slave " << slave->slaveId
+ << " of framework " << framework->id
+ << " on slave " << slave->id
<< " (" << slave->info.hostname() << ") "
- << "exited with result " << result;
+ << "exited with status " << status;
+
+ // TODO(benh): What about a status update that is on it's way
+ // from the slave but got re-ordered on the wire? By sending
+ // this status updates here we are not allowing possibly
+ // finished tasks to reach the scheduler appropriately. In
+ // stead, it seems like perhaps the right thing to do is to have
+ // the slave be responsible for sending those status updates,
+ // and have the master (or better yet ... and soon ... the
+ // scheduler) decide that a task is dead only when a slave lost
+ // has occured.
// Tell the framework which tasks have been lost.
- foreachpaircopy (_, Task* task, framework->tasks) {
- if (task->slave_id() == slave->slaveId &&
+ foreachvalue (Task* task, utils::copy(framework->tasks)) {
+ if (task->slave_id() == slave->id &&
task->executor_id() == executorId) {
- MSG<M2F_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(task->framework_id());
- TaskStatus* status = out.mutable_status();
+ StatusUpdateMessage message;
+ StatusUpdate* update = message.mutable_update();
+ update->mutable_framework_id()->MergeFrom(task->framework_id());
+ update->mutable_executor_id()->MergeFrom(task->executor_id());
+ update->mutable_slave_id()->MergeFrom(task->slave_id());
+ TaskStatus* status = update->mutable_status();
status->mutable_task_id()->MergeFrom(task->task_id());
- status->mutable_slave_id()->MergeFrom(task->slave_id());
status->set_state(TASK_LOST);
- send(framework->pid, out);
+ update->set_timestamp(elapsedTime());
+ update->set_uuid(UUID::random().toBytes());
+ send(framework->pid, message);
LOG(INFO) << "Removing task " << task->task_id()
<< " of framework " << frameworkId
<< " because of lost executor";
- removeTask(task, TRR_EXECUTOR_LOST);
+ removeTask(framework, slave, task, TRR_EXECUTOR_LOST);
}
}
@@ -1119,16 +1011,17 @@ void Master::activatedSlaveHostnamePort(
}
-void Master::deactivatedSlaveHostnamePort(const string& hostname, uint16_t port)
+void Master::deactivatedSlaveHostnamePort(const string& hostname,
+ uint16_t port)
{
if (slaveHostnamePorts.count(hostname, port) > 0) {
// Look for a connected slave and remove it.
- foreachpair (_, Slave* slave, slaves) {
+ foreachvalue (Slave* slave, slaves) {
if (slave->info.hostname() == hostname && slave->pid.port == port) {
- LOG(WARNING) << "Removing slave " << slave->slaveId << " at "
+ LOG(WARNING) << "Removing slave " << slave->id << " at "
<< hostname << ":" << port
<< " because it has been deactivated";
- send(slave->pid, process::TERMINATE);
+ send(slave->pid, TERMINATE);
removeSlave(slave);
break;
}
@@ -1144,21 +1037,26 @@ void Master::deactivatedSlaveHostnamePor
void Master::timerTick()
{
// Check which framework filters can be expired.
- foreachpair (_, Framework* framework, frameworks) {
+ foreachvalue (Framework* framework, frameworks) {
framework->removeExpiredFilters(elapsedTime());
}
// Do allocations!
allocator->timerTick();
+
+ // Scheduler another timer tick!
+ delay(1.0, self(), &Master::timerTick);
}
-void Master::frameworkExpired(const FrameworkID& frameworkId)
+void Master::frameworkFailoverTimeout(const FrameworkID& frameworkId,
+ double reregisteredTime)
{
- Framework* framework = lookupFramework(frameworkId);
- if (framework != NULL) {
- LOG(INFO) << "Framework failover timer expired, removing "
- << framework;
+ Framework* framework = getFramework(frameworkId);
+ if (framework != NULL && !framework->active &&
+ framework->reregisteredTime == reregisteredTime) {
+ LOG(INFO) << "Framework failover timeout, removing framework "
+ << framework->id;
removeFramework(framework);
}
}
@@ -1166,340 +1064,120 @@ void Master::frameworkExpired(const Fram
void Master::exited()
{
- // TODO(benh): Could we get PROCESS_EXIT from a network partition?
- LOG(INFO) << "Process exited: " << from();
- if (pidToFrameworkId.count(from()) > 0) {
- const FrameworkID& frameworkId = pidToFrameworkId[from()];
- Framework* framework = lookupFramework(frameworkId);
- if (framework != NULL) {
- LOG(INFO) << "Framework " << frameworkId << " disconnected";
+ foreachvalue (Framework* framework, frameworks) {
+ if (framework->pid == from()) {
+ LOG(INFO) << "Framework " << framework->id << " disconnected";
+
+// removeFramework(framework);
// Stop sending offers here for now.
framework->active = false;
+ // Delay dispatching a message to ourselves for the timeout.
+ delay(FRAMEWORK_FAILOVER_TIMEOUT, self(),
+ &Master::frameworkFailoverTimeout,
+ framework->id, framework->reregisteredTime);
+
// Remove the framework's slot offers.
- foreachcopy (SlotOffer* offer, framework->slotOffers) {
- removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
+ foreach (Offer* offer, utils::copy(framework->offers)) {
+ removeOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
}
-
-// framework->failoverTimer =
-// new FrameworkFailoverTimer(self(), frameworkId);
-// link(spawn(framework->failoverTimer));
- removeFramework(framework);
+ return;
}
- } else if (pidToSlaveId.count(from()) > 0) {
- const SlaveID& slaveId = pidToSlaveId[from()];
- Slave* slave = lookupSlave(slaveId);
- if (slave != NULL) {
- LOG(INFO) << slave << " disconnected";
+ }
+
+ foreachvalue (Slave* slave, slaves) {
+ if (slave->pid == from()) {
+ LOG(INFO) << "Slave " << slave->id << " disconnected";
removeSlave(slave);
- }
- } else {
- foreachpair (_, Framework* framework, frameworks) {
- if (framework->failoverTimer != NULL &&
- framework->failoverTimer->self() == from()) {
- LOG(ERROR) << "Bad framework failover timer, removing "
- << framework;
- removeFramework(framework);
- break;
- }
+ return;
}
}
}
-Promise<HttpResponse> Master::http_info_json(const HttpRequest& request)
+OfferID Master::makeOffer(Framework* framework,
+ const vector<SlaveResources>& resources)
{
- LOG(INFO) << "HTTP request for '/master/info.json'";
-
- ostringstream out;
-
- out <<
- "{" <<
- "\"built_date\":\"" << build::DATE << "\"," <<
- "\"build_user\":\"" << build::USER << "\"," <<
- "\"start_time\":\"" << startTime << "\"," <<
- "\"pid\":\"" << self() << "\"" <<
- "}";
+ const OfferID& offerId = newOfferId();
- HttpOKResponse response;
- response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
- response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
- response.body = out.str().data();
- return response;
-}
+ Offer* offer = new Offer(offerId, framework->id, resources);
+ offers[offer->id] = offer;
+ framework->addOffer(offer);
-Promise<HttpResponse> Master::http_frameworks_json(const HttpRequest& request)
-{
- LOG(INFO) << "HTTP request for '/master/frameworks.json'";
+ // Update the resource information within each of the slave objects. Gross!
+ foreach (const SlaveResources& r, resources) {
+ r.slave->offers.insert(offer);
+ r.slave->resourcesOffered += r.resources;
+ }
- ostringstream out;
+ LOG(INFO) << "Sending offer " << offer->id
+ << " to framework " << framework->id;
- out << "[";
+ ResourceOfferMessage message;
+ message.mutable_offer_id()->MergeFrom(offerId);
- foreachpair (_, Framework* framework, frameworks) {
- out <<
- "{" <<
- "\"id\":\"" << framework->frameworkId << "\"," <<
- "\"name\":\"" << framework->info.name() << "\"," <<
- "\"user\":\"" << framework->info.user() << "\""
- "},";
- }
+ foreach (const SlaveResources& r, resources) {
+ SlaveOffer* offer = message.add_offers();
+ offer->mutable_slave_id()->MergeFrom(r.slave->id);
+ offer->set_hostname(r.slave->info.hostname());
+ offer->mutable_resources()->MergeFrom(r.resources);
- // Backup the put pointer to overwrite the last comma (hack).
- if (frameworks.size() > 0) {
- long pos = out.tellp();
- out.seekp(pos - 1);
+ message.add_pids(r.slave->pid);
}
- out << "]";
+ send(framework->pid, message);
- HttpOKResponse response;
- response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
- response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
- response.body = out.str().data();
- return response;
+ return offerId;
}
-Promise<HttpResponse> Master::http_slaves_json(const HttpRequest& request)
+// Process a resource offer reply (for a non-cancelled offer) by launching
+// the desired tasks (if the offer contains a valid set of tasks) and
+// reporting any unused resources to the allocator.
+void Master::processOfferReply(Offer* offer,
+ const vector<TaskDescription>& tasks,
+ const Params& params)
{
- LOG(INFO) << "HTTP request for '/master/slaves.json'";
-
- ostringstream out;
+ LOG(INFO) << "Received reply for " << offer;
- out << "[";
+ Framework* framework = getFramework(offer->frameworkId);
+ CHECK(framework != NULL);
- foreachpair (_, Slave* slave, slaves) {
- // TODO(benh): Send all of the resources (as JSON).
- Resources resources(slave->info.resources());
- Resource::Scalar cpus = resources.getScalar("cpus", Resource::Scalar());
- Resource::Scalar mem = resources.getScalar("mem", Resource::Scalar());
- out <<
- "{" <<
- "\"id\":\"" << slave->slaveId << "\"," <<
- "\"hostname\":\"" << slave->info.hostname() << "\"," <<
- "\"cpus\":" << cpus.value() << "," <<
- "\"mem\":" << mem.value() <<
- "},";
+ // Count resources in the offer.
+ hashmap<Slave*, Resources> resourcesOffered;
+ foreach (const SlaveResources& r, offer->resources) {
+ resourcesOffered[r.slave] = r.resources;
}
- // Backup the put pointer to overwrite the last comma (hack).
- if (slaves.size() > 0) {
- long pos = out.tellp();
- out.seekp(pos - 1);
- }
+ // Count used resources and check that its tasks are valid.
+ hashmap<Slave*, Resources> resourcesUsed;
+ foreach (const TaskDescription& task, tasks) {
+ // Check whether the task is on a valid slave.
+ Slave* slave = getSlave(task.slave_id());
+ if (slave == NULL || resourcesOffered.count(slave) == 0) {
+ terminateFramework(framework, 0, "Invalid slave in offer reply");
+ return;
+ }
- out << "]";
+ // Check whether or not the resources for the task are valid.
+ // TODO(benh): In the future maybe we can also augment the
+ // protobuf to deal with fragmentation purposes by providing some
+ // sort of minimum amount of resources required per task.
- HttpOKResponse response;
- response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
- response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
- response.body = out.str().data();
- return response;
-}
+ if (task.resources().size() == 0) {
+ terminateFramework(framework, 0, "Invalid resources for task");
+ return;
+ }
-
-Promise<HttpResponse> Master::http_tasks_json(const HttpRequest& request)
-{
- LOG(INFO) << "HTTP request for '/master/tasks.json'";
-
- ostringstream out;
-
- out << "[";
-
- foreachpair (_, Framework* framework, frameworks) {
- foreachpair (_, Task* task, framework->tasks) {
- // TODO(benh): Send all of the resources (as JSON).
- Resources resources(task->resources());
- Resource::Scalar cpus = resources.getScalar("cpus", Resource::Scalar());
- Resource::Scalar mem = resources.getScalar("mem", Resource::Scalar());
- const string& state =
- TaskState_descriptor()->FindValueByNumber(task->state())->name();
- out <<
- "{" <<
- "\"task_id\":\"" << task->task_id() << "\"," <<
- "\"framework_id\":\"" << task->framework_id() << "\"," <<
- "\"slave_id\":\"" << task->slave_id() << "\"," <<
- "\"name\":\"" << task->name() << "\"," <<
- "\"state\":\"" << state << "\"," <<
- "\"cpus\":" << cpus.value() << "," <<
- "\"mem\":" << mem.value() <<
- "},";
- }
- }
-
- // Backup the put pointer to overwrite the last comma (hack).
- if (frameworks.size() > 0) {
- long pos = out.tellp();
- out.seekp(pos - 1);
- }
-
- out << "]";
-
- HttpOKResponse response;
- response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
- response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
- response.body = out.str().data();
- return response;
-}
-
-
-Promise<HttpResponse> Master::http_stats_json(const HttpRequest& request)
-{
- LOG(INFO) << "Http request for '/master/stats.json'";
-
- ostringstream out;
-
- out <<
- "{" <<
- "\"uptime\":" << elapsedTime() - startTime << "," <<
- "\"total_schedulers\":" << frameworks.size() << "," <<
- "\"active_schedulers\":" << getActiveFrameworks().size() << "," <<
- "\"activated_slaves\":" << slaveHostnamePorts.size() << "," <<
- "\"connected_slaves\":" << slaves.size() << "," <<
- "\"launched_tasks\":" << statistics.launched_tasks << "," <<
- "\"finished_tasks\":" << statistics.finished_tasks << "," <<
- "\"killed_tasks\":" << statistics.killed_tasks << "," <<
- "\"failed_tasks\":" << statistics.failed_tasks << "," <<
- "\"lost_tasks\":" << statistics.lost_tasks << "," <<
- "\"valid_status_updates\":" << statistics.valid_status_updates << "," <<
- "\"invalid_status_updates\":" << statistics.invalid_status_updates << "," <<
- "\"valid_framework_messages\":" << statistics.valid_framework_messages << "," <<
- "\"invalid_framework_messages\":" << statistics.invalid_framework_messages <<
- "}";
-
- HttpOKResponse response;
- response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
- response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
- response.body = out.str().data();
- return response;
-}
-
-
-Promise<HttpResponse> Master::http_vars(const HttpRequest& request)
-{
- LOG(INFO) << "HTTP request for '/master/vars'";
-
- ostringstream out;
-
- out <<
- "build_date " << build::DATE << "\n" <<
- "build_user " << build::USER << "\n" <<
- "build_flags " << build::FLAGS << "\n";
-
- // Also add the configuration values.
- foreachpair (const string& key, const string& value, conf.getMap()) {
- out << key << " " << value << "\n";
- }
-
- out <<
- "uptime " << elapsedTime() - startTime << "\n" <<
- "total_schedulers " << frameworks.size() << "\n" <<
- "active_schedulers " << getActiveFrameworks().size() << "\n" <<
- "activated_slaves " << slaveHostnamePorts.size() << "\n" <<
- "connected_slaves " << slaves.size() << "\n" <<
- "launched_tasks " << statistics.launched_tasks << "\n" <<
- "finished_tasks " << statistics.finished_tasks << "\n" <<
- "killed_tasks " << statistics.killed_tasks << "\n" <<
- "failed_tasks " << statistics.failed_tasks << "\n" <<
- "lost_tasks " << statistics.lost_tasks << "\n" <<
- "valid_status_updates " << statistics.valid_status_updates << "\n" <<
- "invalid_status_updates " << statistics.invalid_status_updates << "\n" <<
- "valid_framework_messages " << statistics.valid_framework_messages << "\n" <<
- "invalid_framework_messages " << statistics.invalid_framework_messages << "\n";
-
- HttpOKResponse response;
- response.headers["Content-Type"] = "text/plain";
- response.headers["Content-Length"] = lexical_cast<string>(out.str().size());
- response.body = out.str().data();
- return response;
-}
-
-
-OfferID Master::makeOffer(Framework* framework,
- const vector<SlaveResources>& resources)
-{
- const OfferID& offerId = newOfferId();
-
- SlotOffer* offer = new SlotOffer(offerId, framework->frameworkId, resources);
-
- slotOffers[offer->offerId] = offer;
- framework->addOffer(offer);
-
- // Update the resource information within each of the slave objects. Gross!
- foreach (const SlaveResources& r, resources) {
- r.slave->slotOffers.insert(offer);
- r.slave->resourcesOffered += r.resources;
- }
-
- LOG(INFO) << "Sending offer " << offer->offerId
- << " to framework " << framework->frameworkId;
-
- MSG<M2F_RESOURCE_OFFER> out;
- out.mutable_offer_id()->MergeFrom(offerId);
-
- foreach (const SlaveResources& r, resources) {
- SlaveOffer* offer = out.add_offers();
- offer->mutable_slave_id()->MergeFrom(r.slave->slaveId);
- offer->set_hostname(r.slave->info.hostname());
- offer->mutable_resources()->MergeFrom(r.resources);
-
- out.add_pids(r.slave->pid);
- }
-
- send(framework->pid, out);
-
- return offerId;
-}
-
-
-// Process a resource offer reply (for a non-cancelled offer) by launching
-// the desired tasks (if the offer contains a valid set of tasks) and
-// reporting any unused resources to the allocator.
-void Master::processOfferReply(SlotOffer* offer,
- const vector<TaskDescription>& tasks,
- const Params& params)
-{
- LOG(INFO) << "Received reply for " << offer;
-
- Framework* framework = lookupFramework(offer->frameworkId);
- CHECK(framework != NULL);
-
- // Count resources in the offer.
- unordered_map<Slave*, Resources> resourcesOffered;
- foreach (const SlaveResources& r, offer->resources) {
- resourcesOffered[r.slave] = r.resources;
- }
-
- // Count used resources and check that its tasks are valid.
- unordered_map<Slave*, Resources> resourcesUsed;
- foreach (const TaskDescription& task, tasks) {
- // Check whether the task is on a valid slave.
- Slave* slave = lookupSlave(task.slave_id());
- if (slave == NULL || resourcesOffered.count(slave) == 0) {
- terminateFramework(framework, 0, "Invalid slave in offer reply");
- return;
- }
-
- // Check whether or not the resources for the task are valid.
- // TODO(benh): In the future maybe we can also augment the
- // protobuf to deal with fragmentation purposes by providing some
- // sort of minimum amount of resources required per task.
-
- if (task.resources().size() == 0) {
- terminateFramework(framework, 0, "Invalid resources for task");
- return;
- }
-
- foreach (const Resource& resource, task.resources()) {
- if (!Resources::isAllocatable(resource)) {
- // TODO(benh): Also send back the invalid resources as a string?
- terminateFramework(framework, 0, "Invalid resources for task");
- return;
- }
- }
+ foreach (const Resource& resource, task.resources()) {
+ if (!Resources::isAllocatable(resource)) {
+ // TODO(benh): Also send back the invalid resources as a string?
+ terminateFramework(framework, 0, "Invalid resources for task");
+ return;
+ }
+ }
resourcesUsed[slave] += task.resources();
}
@@ -1513,12 +1191,12 @@ void Master::processOfferReply(SlotOffer
}
// Check that there are no duplicate task IDs.
- unordered_set<TaskID> idsInResponse;
+ hashset<TaskID> idsInResponse;
foreach (const TaskDescription& task, tasks) {
if (framework->tasks.count(task.task_id()) > 0 ||
idsInResponse.count(task.task_id()) > 0) {
- terminateFramework(framework, 0, "Duplicate task ID: " +
- lexical_cast<string>(task.task_id()));
+ string error = "Duplicate task ID: " + task.task_id().value();
+ terminateFramework(framework, 0, error);
return;
}
idsInResponse.insert(task.task_id());
@@ -1531,11 +1209,18 @@ void Master::processOfferReply(SlotOffer
// Get out the timeout for left over resources (if exists), and use
// that to calculate the expiry timeout.
- int timeout = DEFAULT_REFUSAL_TIMEOUT;
+ double timeout = DEFAULT_REFUSAL_TIMEOUT;
for (int i = 0; i < params.param_size(); i++) {
if (params.param(i).key() == "timeout") {
- timeout = lexical_cast<int>(params.param(i).value());
+ try {
+ timeout = boost::lexical_cast<double>(params.param(i).value());
+ } catch (boost::bad_lexical_cast&) {
+ string error = "Failed to convert value '" +
+ params.param(i).value() + "' for key 'timeout' to an integer";
+ terminateFramework(framework, 0, error);
+ return;
+ }
break;
}
}
@@ -1566,7 +1251,7 @@ void Master::processOfferReply(SlotOffer
}
// Return the resources left to the allocator.
- removeSlotOffer(offer, ORR_FRAMEWORK_REPLIED, resourcesUnused);
+ removeOffer(offer, ORR_FRAMEWORK_REPLIED, resourcesUnused);
}
@@ -1575,7 +1260,7 @@ void Master::launchTask(Framework* frame
// The invariant right now is that launchTask is called only for
// TaskDescriptions where the slave is still valid (see the code
// above in processOfferReply).
- Slave* slave = lookupSlave(task.slave_id());
+ Slave* slave = getSlave(task.slave_id());
CHECK(slave != NULL);
// Determine the executor ID for this task.
@@ -1584,7 +1269,7 @@ void Master::launchTask(Framework* frame
: framework->info.executor().executor_id();
Task* t = new Task();
- t->mutable_framework_id()->MergeFrom(framework->frameworkId);
+ t->mutable_framework_id()->MergeFrom(framework->id);
t->mutable_executor_id()->MergeFrom(executorId);
t->set_state(TASK_STARTING);
t->set_name(task.name());
@@ -1599,79 +1284,28 @@ void Master::launchTask(Framework* frame
LOG(INFO) << "Launching " << t << " on " << slave;
- MSG<M2S_RUN_TASK> out;
- out.mutable_framework()->MergeFrom(framework->info);
- out.mutable_framework_id()->MergeFrom(framework->frameworkId);
- out.set_pid(framework->pid);
- out.mutable_task()->MergeFrom(task);
- send(slave->pid, out);
-
- statistics.launched_tasks++;
-}
-
-
-// Terminate a framework, sending it a particular error message
-// TODO: Make the error codes and messages programmer-friendly
-void Master::terminateFramework(Framework* framework,
- int32_t code,
- const string& message)
-{
- LOG(INFO) << "Terminating " << framework << " due to error: " << message;
-
- MSG<M2F_ERROR> out;
- out.set_code(code);
- out.set_message(message);
- send(framework->pid, out);
+ RunTaskMessage message;
+ message.mutable_framework()->MergeFrom(framework->info);
+ message.mutable_framework_id()->MergeFrom(framework->id);
+ message.set_pid(framework->pid);
+ message.mutable_task()->MergeFrom(task);
+ send(slave->pid, message);
- removeFramework(framework);
-}
-
-
-// Remove a slot offer (because it was replied or we lost a framework or slave)
-void Master::removeSlotOffer(SlotOffer* offer,
- OfferReturnReason reason,
- const vector<SlaveResources>& resourcesUnused)
-{
- // Remove from slaves.
- foreach (SlaveResources& r, offer->resources) {
- CHECK(r.slave != NULL);
- r.slave->resourcesOffered -= r.resources;
- r.slave->slotOffers.erase(offer);
- }
-
- // Remove from framework
- Framework *framework = lookupFramework(offer->frameworkId);
- CHECK(framework != NULL);
- framework->removeOffer(offer);
-
- // Also send framework a rescind message unless the reason we are
- // removing the offer is that the framework replied to it
- if (reason != ORR_FRAMEWORK_REPLIED) {
- MSG<M2F_RESCIND_OFFER> out;
- out.mutable_offer_id()->MergeFrom(offer->offerId);
- send(framework->pid, out);
- }
-
- // Tell the allocator about the unused resources.
- allocator->offerReturned(offer, reason, resourcesUnused);
-
- // Delete it
- slotOffers.erase(offer->offerId);
- delete offer;
+ stats.tasks[TASK_STARTING]++;
}
void Master::addFramework(Framework* framework)
{
- CHECK(frameworks.count(framework->frameworkId) == 0);
+ CHECK(frameworks.count(framework->id) == 0);
+
+ frameworks[framework->id] = framework;
- frameworks[framework->frameworkId] = framework;
- pidToFrameworkId[framework->pid] = framework->frameworkId;
link(framework->pid);
- MSG<M2F_REGISTER_REPLY> out;
- out.mutable_framework_id()->MergeFrom(framework->frameworkId);
- send(framework->pid, out);
+ FrameworkRegisteredMessage message;
+ message.mutable_framework_id()->MergeFrom(framework->id);
+ send(framework->pid, message);
allocator->frameworkAdded(framework);
}
@@ -1683,103 +1317,115 @@ void Master::failoverFramework(Framework
{
const UPID& oldPid = framework->pid;
- // Remove the framework's slot offers (if they weren't removed before)..
+ // Remove the framework's slot offers (if they weren't removed before).
// TODO(benh): Consider just reoffering these to the new framework.
- foreachcopy (SlotOffer* offer, framework->slotOffers) {
- removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
+ foreach (Offer* offer, utils::copy(framework->offers)) {
+ removeOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
}
- MSG<M2F_ERROR> out;
- out.set_code(1);
- out.set_message("Framework failover");
- send(oldPid, out);
+ {
+ FrameworkErrorMessage message;
+ message.set_code(1);
+ message.set_message("Framework failover");
+ send(oldPid, message);
+ }
// TODO(benh): unlink(oldPid);
- pidToFrameworkId.erase(oldPid);
- pidToFrameworkId[newPid] = framework->frameworkId;
framework->pid = newPid;
link(newPid);
- // Kill the failover timer.
- if (framework->failoverTimer != NULL) {
- process::post(framework->failoverTimer->self(), process::TERMINATE);
- process::wait(framework->failoverTimer->self());
- delete framework->failoverTimer;
- framework->failoverTimer = NULL;
- }
-
// Make sure we can get offers again.
framework->active = true;
- MSG<M2F_REGISTER_REPLY> reply;
- reply.mutable_framework_id()->MergeFrom(framework->frameworkId);
- send(newPid, reply);
+ framework->reregisteredTime = elapsedTime();
+
+ FrameworkRegisteredMessage message;
+ message.mutable_framework_id()->MergeFrom(framework->id);
+ send(newPid, message);
+}
+
+
+void Master::terminateFramework(Framework* framework,
+ int32_t code,
+ const string& error)
+{
+ LOG(INFO) << "Terminating " << framework << " due to error: " << error;
+
+ FrameworkErrorMessage message;
+ message.set_code(code);
+ message.set_message(error);
+ send(framework->pid, message);
+
+ removeFramework(framework);
}
-// Kill all of a framework's tasks, delete the framework object, and
-// reschedule slot offers for slots that were assigned to this framework
void Master::removeFramework(Framework* framework)
-{
+{
framework->active = false;
// TODO: Notify allocator that a framework removal is beginning?
- // Tell slaves to kill the framework
- foreachpair (_, Slave *slave, slaves) {
- MSG<M2S_KILL_FRAMEWORK> out;
- out.mutable_framework_id()->MergeFrom(framework->frameworkId);
- send(slave->pid, out);
+ // Tell slaves to shutdown the framework.
+ foreachvalue (Slave* slave, slaves) {
+ ShutdownFrameworkMessage message;
+ message.mutable_framework_id()->MergeFrom(framework->id);
+ send(slave->pid, message);
}
// Remove pointers to the framework's tasks in slaves
- foreachpaircopy (_, Task *task, framework->tasks) {
- Slave *slave = lookupSlave(task->slave_id());
+ foreachvalue (Task* task, utils::copy(framework->tasks)) {
+ Slave* slave = getSlave(task->slave_id());
CHECK(slave != NULL);
- removeTask(task, TRR_FRAMEWORK_LOST);
+ removeTask(framework, slave, task, TRR_FRAMEWORK_LOST);
}
// Remove the framework's slot offers (if they weren't removed before).
- foreachcopy (SlotOffer* offer, framework->slotOffers) {
- removeSlotOffer(offer, ORR_FRAMEWORK_LOST, offer->resources);
+ foreach (Offer* offer, utils::copy(framework->offers)) {
+ removeOffer(offer, ORR_FRAMEWORK_LOST, offer->resources);
}
// TODO(benh): Similar code between removeFramework and
// failoverFramework needs to be shared!
// TODO(benh): unlink(framework->pid);
- pidToFrameworkId.erase(framework->pid);
// Delete it.
- frameworks.erase(framework->frameworkId);
+ frameworks.erase(framework->id);
allocator->frameworkRemoved(framework);
delete framework;
}
-void Master::addSlave(Slave* slave)
+void Master::addSlave(Slave* slave, bool reregister)
{
CHECK(slave != NULL);
- slaves[slave->slaveId] = slave;
- pidToSlaveId[slave->pid] = slave->slaveId;
+ slaves[slave->id] = slave;
+
link(slave->pid);
allocator->slaveAdded(slave);
- MSG<M2S_REGISTER_REPLY> out;
- out.mutable_slave_id()->MergeFrom(slave->slaveId);
- send(slave->pid, out);
+ if (!reregister) {
+ SlaveRegisteredMessage message;
+ message.mutable_slave_id()->MergeFrom(slave->id);
+ send(slave->pid, message);
+ } else {
+ SlaveReregisteredMessage message;
+ message.mutable_slave_id()->MergeFrom(slave->id);
+ send(slave->pid, message);
+ }
// TODO(benh):
// // Ask the slaves manager to monitor this slave for us.
- // process::dispatch(slavesManager->self(), &SlavesManager::monitor,
- // slave->pid, slave->info, slave->slaveId);
+ // dispatch(slavesManager->self(), &SlavesManager::monitor,
+ // slave->pid, slave->info, slave->id);
// Set up an observer for the slave.
slave->observer = new SlaveObserver(slave->pid, slave->info,
- slave->slaveId, slavesManager->self());
- process::spawn(slave->observer);
+ slave->id, slavesManager->self());
+ spawn(slave->observer);
}
@@ -1787,7 +1433,7 @@ void Master::readdSlave(Slave* slave, co
{
CHECK(slave != NULL);
- addSlave(slave);
+ addSlave(slave, true);
for (int i = 0; i < tasks.size(); i++) {
Task* task = new Task(tasks[i]);
@@ -1801,20 +1447,20 @@ void Master::readdSlave(Slave* slave, co
// will add them then. We also tell this slave the current
// framework pid for this task. Again, we do the same thing
// if a framework currently isn't registered.
- Framework* framework = lookupFramework(task->framework_id());
+ Framework* framework = getFramework(task->framework_id());
if (framework != NULL) {
framework->addTask(task);
- MSG<M2S_UPDATE_FRAMEWORK> out;
- out.mutable_framework_id()->MergeFrom(framework->frameworkId);
- out.set_pid(framework->pid);
- send(slave->pid, out);
+ UpdateFrameworkMessage message;
+ message.mutable_framework_id()->MergeFrom(framework->id);
+ message.set_pid(framework->pid);
+ send(slave->pid, message);
} else {
// TODO(benh): We should really put a timeout on how long we
// keep tasks running on a slave that never have frameworks
// reregister and claim them.
LOG(WARNING) << "Possibly orphaned task " << task->task_id()
<< " of framework " << task->framework_id()
- << " running on slave " << slave->slaveId;
+ << " running on slave " << slave->id;
}
}
}
@@ -1828,8 +1474,8 @@ void Master::removeSlave(Slave* slave)
// TODO: Notify allocator that a slave removal is beginning?
// Remove pointers to slave's tasks in frameworks, and send status updates
- foreachpaircopy (_, Task* task, slave->tasks) {
- Framework *framework = lookupFramework(task->framework_id());
+ foreachvalue (Task* task, utils::copy(slave->tasks)) {
+ Framework* framework = getFramework(task->framework_id());
// A framework might not actually exist because the master failed
// over and the framework hasn't reconnected. This can be a tricky
// situation for frameworks that want to have high-availability,
@@ -1837,21 +1483,25 @@ void Master::removeSlave(Slave* slave)
// status update about this task. Perhaps in the future what we
// want to do is create a local Framework object to represent that
// framework until it fails over. See the TODO above in
- // S2M_REREGISTER_SLAVE.
+ // Master::reregisterSlave.
if (framework != NULL) {
- MSG<M2F_STATUS_UPDATE> out;
- out.mutable_framework_id()->MergeFrom(task->framework_id());
- TaskStatus* status = out.mutable_status();
+ StatusUpdateMessage message;
+ StatusUpdate* update = message.mutable_update();
+ update->mutable_framework_id()->MergeFrom(task->framework_id());
+ update->mutable_executor_id()->MergeFrom(task->executor_id());
+ update->mutable_slave_id()->MergeFrom(task->slave_id());
+ TaskStatus* status = update->mutable_status();
status->mutable_task_id()->MergeFrom(task->task_id());
- status->mutable_slave_id()->MergeFrom(task->slave_id());
status->set_state(TASK_LOST);
- send(framework->pid, out);
+ update->set_timestamp(elapsedTime());
+ update->set_uuid(UUID::random().toBytes());
+ send(framework->pid, message);
}
- removeTask(task, TRR_SLAVE_LOST);
+ removeTask(framework, slave, task, TRR_SLAVE_LOST);
}
// Remove slot offers from the slave; this will also rescind them
- foreachcopy (SlotOffer* offer, slave->slotOffers) {
+ foreach (Offer* offer, utils::copy(slave->offers)) {
// Only report resources on slaves other than this one to the allocator
vector<SlaveResources> otherSlaveResources;
foreach (const SlaveResources& r, offer->resources) {
@@ -1859,49 +1509,48 @@ void Master::removeSlave(Slave* slave)
otherSlaveResources.push_back(r);
}
}
- removeSlotOffer(offer, ORR_SLAVE_LOST, otherSlaveResources);
+ removeOffer(offer, ORR_SLAVE_LOST, otherSlaveResources);
}
// Remove slave from any filters
- foreachpair (_, Framework* framework, frameworks) {
+ foreachvalue (Framework* framework, frameworks) {
framework->slaveFilter.erase(slave);
}
// Send lost-slave message to all frameworks (this helps them re-run
// previously finished tasks whose output was on the lost slave)
- foreachpair (_, Framework* framework, frameworks) {
- MSG<M2F_LOST_SLAVE> out;
- out.mutable_slave_id()->MergeFrom(slave->slaveId);
- send(framework->pid, out);
+ foreachvalue (Framework* framework, frameworks) {
+ LostSlaveMessage message;
+ message.mutable_slave_id()->MergeFrom(slave->id);
+ send(framework->pid, message);
}
// TODO(benh):
// // Tell the slaves manager to stop monitoring this slave for us.
- // process::dispatch(slavesManager->self(), &SlavesManager::forget,
- // slave->pid, slave->info, slave->slaveId);
+ // dispatch(slavesManager->self(), &SlavesManager::forget,
+ // slave->pid, slave->info, slave->id);
// Kill the slave observer.
- process::post(slave->observer->self(), process::TERMINATE);
- process::wait(slave->observer->self());
+ terminate(slave->observer);
+ wait(slave->observer);
+
delete slave->observer;
// TODO(benh): unlink(slave->pid);
- pidToSlaveId.erase(slave->pid);
// Delete it
- slaves.erase(slave->slaveId);
+ slaves.erase(slave->id);
allocator->slaveRemoved(slave);
delete slave;
}
// Remove a slot offer (because it was replied or we lost a framework or slave)
-void Master::removeTask(Task* task, TaskRemovalReason reason)
+void Master::removeTask(Framework* framework,
+ Slave* slave,
+ Task* task,
+ TaskRemovalReason reason)
{
- Framework* framework = lookupFramework(task->framework_id());
- Slave* slave = lookupSlave(task->slave_id());
- CHECK(framework != NULL);
- CHECK(slave != NULL);
framework->removeTask(task->task_id());
slave->removeTask(task);
allocator->taskRemoved(task, reason);
@@ -1909,10 +1558,66 @@ void Master::removeTask(Task* task, Task
}
-Allocator* Master::createAllocator()
+void Master::removeOffer(Offer* offer,
+ OfferReturnReason reason,
+ const vector<SlaveResources>& resourcesUnused)
+{
+ // Remove from slaves.
+ foreach (SlaveResources& r, offer->resources) {
+ CHECK(r.slave != NULL);
+ r.slave->resourcesOffered -= r.resources;
+ r.slave->offers.erase(offer);
+ }
+
+ // Remove from framework
+ Framework *framework = getFramework(offer->frameworkId);
+ CHECK(framework != NULL);
+ framework->removeOffer(offer);
+
+ // Also send framework a rescind message unless the reason we are
+ // removing the offer is that the framework replied to it
+ if (reason != ORR_FRAMEWORK_REPLIED) {
+ RescindResourceOfferMessage message;
+ message.mutable_offer_id()->MergeFrom(offer->id);
+ send(framework->pid, message);
+ }
+
+ // Tell the allocator about the unused resources.
+ allocator->offerReturned(offer, reason, resourcesUnused);
+
+ // Delete it
+ offers.erase(offer->id);
+ delete offer;
+}
+
+
+Framework* Master::getFramework(const FrameworkID& frameworkId)
{
- LOG(INFO) << "Creating \"" << allocatorType << "\" allocator";
- return AllocatorFactory::instantiate(allocatorType, this);
+ if (frameworks.count(frameworkId) > 0) {
+ return frameworks[frameworkId];
+ } else {
+ return NULL;
+ }
+}
+
+
+Slave* Master::getSlave(const SlaveID& slaveId)
+{
+ if (slaves.count(slaveId) > 0) {
+ return slaves[slaveId];
+ } else {
+ return NULL;
+ }
+}
+
+
+Offer* Master::getOffer(const OfferID& offerId)
+{
+ if (offers.count(offerId) > 0) {
+ return offers[offerId];
+ } else {
+ return NULL;
+ }
}
@@ -1921,10 +1626,14 @@ Allocator* Master::createAllocator()
// and FWID is an increasing integer.
FrameworkID Master::newFrameworkId()
{
- ostringstream oss;
- oss << masterId << "-" << setw(4) << setfill('0') << nextFrameworkId++;
+ std::ostringstream out;
+
+ out << masterId << "-" << std::setw(4)
+ << std::setfill('0') << nextFrameworkId++;
+
FrameworkID frameworkId;
- frameworkId.set_value(oss.str());
+ frameworkId.set_value(out.str());
+
return frameworkId;
}
@@ -1932,7 +1641,7 @@ FrameworkID Master::newFrameworkId()
OfferID Master::newOfferId()
{
OfferID offerId;
- offerId.set_value(masterId + "-" + lexical_cast<string>(nextOfferId++));
+ offerId.set_value(masterId + "-" + utils::stringify(nextOfferId++));
return offerId;
}
@@ -1940,12 +1649,220 @@ OfferID Master::newOfferId()
SlaveID Master::newSlaveId()
{
SlaveID slaveId;
- slaveId.set_value(masterId + "-" + lexical_cast<string>(nextSlaveId++));
+ slaveId.set_value(masterId + "-" + utils::stringify(nextSlaveId++));
return slaveId;
}
-const Configuration& Master::getConfiguration()
+Promise<HttpResponse> Master::http_info_json(const HttpRequest& request)
{
- return conf;
+ LOG(INFO) << "HTTP request for '/master/info.json'";
+
+ std::ostringstream out;
+
+ out <<
+ "{" <<
+ "\"built_date\":\"" << build::DATE << "\"," <<
+ "\"build_user\":\"" << build::USER << "\"," <<
+ "\"start_time\":\"" << startTime << "\"," <<
+ "\"pid\":\"" << self() << "\"" <<
+ "}";
+
+ HttpOKResponse response;
+ response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
+ response.headers["Content-Length"] = utils::stringify(out.str().size());
+ response.body = out.str().data();
+ return response;
+}
+
+
+Promise<HttpResponse> Master::http_frameworks_json(const HttpRequest& request)
+{
+ LOG(INFO) << "HTTP request for '/master/frameworks.json'";
+
+ std::ostringstream out;
+
+ out << "[";
+
+ foreachvalue (Framework* framework, frameworks) {
+ out <<
+ "{" <<
+ "\"id\":\"" << framework->id << "\"," <<
+ "\"name\":\"" << framework->info.name() << "\"," <<
+ "\"user\":\"" << framework->info.user() << "\""
+ "},";
+ }
+
+ // Backup the put pointer to overwrite the last comma (hack).
+ if (frameworks.size() > 0) {
+ long pos = out.tellp();
+ out.seekp(pos - 1);
+ }
+
+ out << "]";
+
+ HttpOKResponse response;
+ response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
+ response.headers["Content-Length"] = utils::stringify(out.str().size());
+ response.body = out.str().data();
+ return response;
}
+
+
+Promise<HttpResponse> Master::http_slaves_json(const HttpRequest& request)
+{
+ LOG(INFO) << "HTTP request for '/master/slaves.json'";
+
+ std::ostringstream out;
+
+ out << "[";
+
+ foreachvalue (Slave* slave, slaves) {
+ // TODO(benh): Send all of the resources (as JSON).
+ Resources resources(slave->info.resources());
+ Resource::Scalar cpus = resources.getScalar("cpus", Resource::Scalar());
+ Resource::Scalar mem = resources.getScalar("mem", Resource::Scalar());
+ out <<
+ "{" <<
+ "\"id\":\"" << slave->id << "\"," <<
+ "\"hostname\":\"" << slave->info.hostname() << "\"," <<
+ "\"cpus\":" << cpus.value() << "," <<
+ "\"mem\":" << mem.value() <<
+ "},";
+ }
+
+ // Backup the put pointer to overwrite the last comma (hack).
+ if (slaves.size() > 0) {
+ long pos = out.tellp();
+ out.seekp(pos - 1);
+ }
+
+ out << "]";
+
+ HttpOKResponse response;
+ response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
+ response.headers["Content-Length"] = utils::stringify(out.str().size());
+ response.body = out.str().data();
+ return response;
+}
+
+
+Promise<HttpResponse> Master::http_tasks_json(const HttpRequest& request)
+{
+ LOG(INFO) << "HTTP request for '/master/tasks.json'";
+
+ std::ostringstream out;
+
+ out << "[";
+
+ foreachvalue (Framework* framework, frameworks) {
+ foreachvalue (Task* task, framework->tasks) {
+ // TODO(benh): Send all of the resources (as JSON).
+ Resources resources(task->resources());
+ Resource::Scalar cpus = resources.getScalar("cpus", Resource::Scalar());
+ Resource::Scalar mem = resources.getScalar("mem", Resource::Scalar());
+ out <<
+ "{" <<
+ "\"task_id\":\"" << task->task_id() << "\"," <<
+ "\"framework_id\":\"" << task->framework_id() << "\"," <<
+ "\"slave_id\":\"" << task->slave_id() << "\"," <<
+ "\"name\":\"" << task->name() << "\"," <<
+ "\"state\":\"" << task->state() << "\"," <<
+ "\"cpus\":" << cpus.value() << "," <<
+ "\"mem\":" << mem.value() <<
+ "},";
+ }
+ }
+
+ // Backup the put pointer to overwrite the last comma (hack).
+ if (frameworks.size() > 0) {
+ long pos = out.tellp();
+ out.seekp(pos - 1);
+ }
+
+ out << "]";
+
+ HttpOKResponse response;
+ response.headers["Content-Type"] = "text/x-json;charset=UTF-8";
+ response.headers["Content-Length"] = utils::stringify(out.str().size());
+ response.body = out.str().data();
+ return response;
+}
+
+
+Promise<HttpResponse> Master::http_stats_json(const HttpRequest& request)
+{
+ LOG(INFO) << "Http request for '/master/stats.json'";
+
+ std::ostringstream out;
+
+ out << std::setprecision(10);
+
+ out <<
+ "{" <<
+ "\"uptime\":" << elapsedTime() - startTime << "," <<
+ "\"total_schedulers\":" << frameworks.size() << "," <<
+ "\"active_schedulers\":" << getActiveFrameworks().size() << "," <<
+ "\"activated_slaves\":" << slaveHostnamePorts.size() << "," <<
+ "\"connected_slaves\":" << slaves.size() << "," <<
+ "\"started_tasks\":" << stats.tasks[TASK_STARTING] << "," <<
+ "\"finished_tasks\":" << stats.tasks[TASK_FINISHED] << "," <<
+ "\"killed_tasks\":" << stats.tasks[TASK_KILLED] << "," <<
+ "\"failed_tasks\":" << stats.tasks[TASK_FAILED] << "," <<
+ "\"lost_tasks\":" << stats.tasks[TASK_LOST] << "," <<
+ "\"valid_status_updates\":" << stats.validStatusUpdates << "," <<
+ "\"invalid_status_updates\":" << stats.invalidStatusUpdates << "," <<
+ "\"valid_framework_messages\":" << stats.validFrameworkMessages << "," <<
+ "\"invalid_framework_messages\":" << stats.invalidFrameworkMessages <<
+ "}";
+
+ HttpOKResponse response;
[... 50 lines stripped ...]