You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/27 08:08:39 UTC
svn commit: r1140024 [4/15] - in /incubator/mesos/trunk: ./ ec2/
ec2/deploy.karmic64/ ec2/deploy.solaris/ frameworks/torque/nexus-hpl/
include/mesos/ src/ src/common/ src/configurator/ src/detector/
src/examples/ src/examples/java/ src/examples/python/...
Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Mon Jun 27 06:08:33 2011
@@ -4,29 +4,40 @@
#include <string>
#include <vector>
-#include <glog/logging.h>
-
-#include <boost/unordered_map.hpp>
-#include <boost/unordered_set.hpp>
-
#include <process/process.hpp>
+#include <process/protobuf.hpp>
#include "state.hpp"
#include "common/foreach.hpp"
+#include "common/hashmap.hpp"
+#include "common/hashset.hpp"
#include "common/multimap.hpp"
#include "common/resources.hpp"
#include "common/type_utils.hpp"
+#include "common/units.hpp"
+#include "common/utils.hpp"
#include "configurator/configurator.hpp"
-#include "messaging/messages.hpp"
+#include "messages/messages.hpp"
namespace mesos { namespace internal { namespace master {
-using foreach::_;
+using namespace process;
+// Some forward declarations.
+class Allocator;
+class SlavesManager;
+struct Framework;
+struct Slave;
+struct SlaveResources;
+class SlaveObserver;
+struct Offer;
+
+// TODO(benh): Add units after constants.
+// TODO(benh): Also make configuration options be constants.
// Maximum number of slot offers to have outstanding for each framework.
const int MAX_OFFERS_PER_FRAMEWORK = 50;
@@ -53,7 +64,7 @@ const double SLAVE_PONG_TIMEOUT = 15.0;
const int MAX_SLAVE_TIMEOUTS = 5;
// Time to wait for a framework to failover (TODO(benh): Make configurable)).
-const time_t FRAMEWORK_FAILOVER_TIMEOUT = 60 * 60 * 24;
+const double FRAMEWORK_FAILOVER_TIMEOUT = 60 * 60 * 24;
// Reasons why offers might be returned to the Allocator.
@@ -77,17 +88,7 @@ enum TaskRemovalReason
};
-// Some forward declarations.
-class Allocator;
-class SlavesManager;
-struct Framework;
-struct Slave;
-struct SlaveResources;
-class SlaveObserver;
-struct SlotOffer;
-
-
-class Master : public MesosProcess<Master>
+class Master : public ProtobufProcess<Master>
{
public:
Master();
@@ -97,18 +98,9 @@ public:
static void registerOptions(Configurator* configurator);
- process::Promise<state::MasterState*> getState();
-
- OfferID makeOffer(Framework* framework,
- const std::vector<SlaveResources>& resources);
-
- // Return connected frameworks that are not in the process of being removed
- std::vector<Framework*> getActiveFrameworks();
-
- // Return connected slaves that are not in the process of being removed
- std::vector<Slave*> getActiveSlaves();
+ Promise<state::MasterState*> getState();
- void newMasterDetected(const std::string& pid);
+ void newMasterDetected(const UPID& pid);
void noMasterDetected();
void masterDetectionFailure();
void registerFramework(const FrameworkInfo& frameworkInfo);
@@ -121,22 +113,17 @@ public:
const std::vector<TaskDescription>& tasks,
const Params& params);
void reviveOffers(const FrameworkID& frameworkId);
- void killTask(const FrameworkID& frameworkId,
- const TaskID& taskId);
+ void killTask(const FrameworkID& frameworkId, const TaskID& taskId);
void schedulerMessage(const SlaveID& slaveId,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
const std::string& data);
- void statusUpdateAck(const FrameworkID& frameworkId,
- const TaskID& taskId,
- const SlaveID& slaveId);
void registerSlave(const SlaveInfo& slaveInfo);
void reregisterSlave(const SlaveID& slaveId,
const SlaveInfo& slaveInfo,
const std::vector<Task>& tasks);
void unregisterSlave(const SlaveID& slaveId);
- void statusUpdate(const FrameworkID& frameworkId,
- const TaskStatus& status);
+ void statusUpdate(const StatusUpdate& update, const UPID& pid);
void executorMessage(const SlaveID& slaveId,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
@@ -144,102 +131,101 @@ public:
void exitedExecutor(const SlaveID& slaveId,
const FrameworkID& frameworkId,
const ExecutorID& executorId,
- int32_t result);
+ int32_t status);
void activatedSlaveHostnamePort(const std::string& hostname, uint16_t port);
void deactivatedSlaveHostnamePort(const std::string& hostname, uint16_t port);
void timerTick();
- void frameworkExpired(const FrameworkID& frameworkId);
+ void frameworkFailoverTimeout(const FrameworkID& frameworkId,
+ double reregisteredTime);
void exited();
- Framework* lookupFramework(const FrameworkID& frameworkId);
- Slave* lookupSlave(const SlaveID& slaveId);
- SlotOffer* lookupSlotOffer(const OfferID& offerId);
+ // Return connected frameworks that are not in the process of being removed
+ std::vector<Framework*> getActiveFrameworks();
+
+ // Return connected slaves that are not in the process of being removed
+ std::vector<Slave*> getActiveSlaves();
+
+ OfferID makeOffer(Framework* framework,
+ const std::vector<SlaveResources>& resources);
protected:
virtual void operator () ();
void initialize();
- // Process a resource offer reply (for a non-cancelled offer) by launching
- // the desired tasks (if the offer contains a valid set of tasks) and
- // reporting any unused resources to the allocator
- void processOfferReply(SlotOffer* offer,
+ // Process a resource offer reply (for a non-cancelled offer) by
+ // launching the desired tasks (if the offer contains a valid set of
+ // tasks) and reporting any unused resources to the allocator
+ void processOfferReply(Offer* offer,
const std::vector<TaskDescription>& tasks,
const Params& params);
- // Launch a task described in a slot offer response
+ // Launch a task described in an offer response.
void launchTask(Framework* framework, const TaskDescription& task);
+ void addFramework(Framework* framework);
+
+ // Replace the scheduler for a framework with a new process ID, in
+ // the event of a scheduler failover.
+ void failoverFramework(Framework* framework, const UPID& newPid);
+
// Terminate a framework, sending it a particular error message
// TODO: Make the error codes and messages programmer-friendly
void terminateFramework(Framework* framework,
int32_t code,
- const std::string& message);
-
- // Remove a slot offer (because it was replied to, or we want to rescind it,
- // or we lost a framework or a slave)
- void removeSlotOffer(SlotOffer* offer,
- OfferReturnReason reason,
- const std::vector<SlaveResources>& resourcesLeft);
-
- void removeTask(Task* task, TaskRemovalReason reason);
-
- void addFramework(Framework* framework);
-
- // Replace the scheduler for a framework with a new process ID, in the
- // event of a scheduler failover.
- void failoverFramework(Framework* framework, const process::UPID& newPid);
+ const std::string& error);
// Kill all of a framework's tasks, delete the framework object, and
// reschedule slot offers for slots that were assigned to this framework
void removeFramework(Framework* framework);
// Add a slave.
- void addSlave(Slave* slave);
+ void addSlave(Slave* slave, bool reregister = false);
void readdSlave(Slave* slave, const std::vector<Task>& tasks);
// Lose all of a slave's tasks and delete the slave object
void removeSlave(Slave* slave);
- virtual Allocator* createAllocator();
+ void removeTask(Framework* framework,
+ Slave* slave,
+ Task* task,
+ TaskRemovalReason reason);
+
+ // Remove a slot offer (because it was replied to, or we want to rescind it,
+ // or we lost a framework or a slave)
+ void removeOffer(Offer* offer,
+ OfferReturnReason reason,
+ const std::vector<SlaveResources>& resourcesLeft);
+
+ Framework* getFramework(const FrameworkID& frameworkId);
+ Slave* getSlave(const SlaveID& slaveId);
+ Offer* getOffer(const OfferID& offerId);
FrameworkID newFrameworkId();
OfferID newOfferId();
SlaveID newSlaveId();
- const Configuration& getConfiguration();
-
private:
+ friend struct SlaveRegistrar;
+ friend struct SlaveReregistrar;
+ // TODO(benh): Remove once SimpleAllocator doesn't use Master::get*.
+ friend class SimpleAllocator;
+
// TODO(benh): Better naming and name scope for these http handlers.
- process::Promise<process::HttpResponse> http_info_json(const process::HttpRequest& request);
- process::Promise<process::HttpResponse> http_frameworks_json(const process::HttpRequest& request);
- process::Promise<process::HttpResponse> http_slaves_json(const process::HttpRequest& request);
- process::Promise<process::HttpResponse> http_tasks_json(const process::HttpRequest& request);
- process::Promise<process::HttpResponse> http_stats_json(const process::HttpRequest& request);
- process::Promise<process::HttpResponse> http_vars(const process::HttpRequest& request);
+ Promise<HttpResponse> http_info_json(const HttpRequest& request);
+ Promise<HttpResponse> http_frameworks_json(const HttpRequest& request);
+ Promise<HttpResponse> http_slaves_json(const HttpRequest& request);
+ Promise<HttpResponse> http_tasks_json(const HttpRequest& request);
+ Promise<HttpResponse> http_stats_json(const HttpRequest& request);
+ Promise<HttpResponse> http_vars(const HttpRequest& request);
const Configuration conf;
- SlavesManager* slavesManager;
-
- multimap<std::string, uint16_t> slaveHostnamePorts;
-
- boost::unordered_map<FrameworkID, Framework*> frameworks;
- boost::unordered_map<SlaveID, Slave*> slaves;
- boost::unordered_map<OfferID, SlotOffer*> slotOffers;
-
- boost::unordered_map<process::UPID, FrameworkID> pidToFrameworkId;
- boost::unordered_map<process::UPID, SlaveID> pidToSlaveId;
-
- int64_t nextFrameworkId; // Used to give each framework a unique ID.
- int64_t nextOfferId; // Used to give each slot offer a unique ID.
- int64_t nextSlaveId; // Used to give each slave a unique ID.
+ bool active;
- std::string allocatorType;
Allocator* allocator;
-
- bool active;
+ SlavesManager* slavesManager;
// Contains the date the master was launched and
// some ephemeral token (e.g. returned from
@@ -247,18 +233,24 @@ private:
// created by this master.
std::string masterId;
+ multimap<std::string, uint16_t> slaveHostnamePorts;
+
+ hashmap<FrameworkID, Framework*> frameworks;
+ hashmap<SlaveID, Slave*> slaves;
+ hashmap<OfferID, Offer*> offers;
+
+ int64_t nextFrameworkId; // Used to give each framework a unique ID.
+ int64_t nextOfferId; // Used to give each slot offer a unique ID.
+ int64_t nextSlaveId; // Used to give each slave a unique ID.
+
// Statistics (initialized in Master::initialize).
struct {
- uint64_t launched_tasks;
- uint64_t finished_tasks;
- uint64_t killed_tasks;
- uint64_t failed_tasks;
- uint64_t lost_tasks;
- uint64_t valid_status_updates;
- uint64_t invalid_status_updates;
- uint64_t valid_framework_messages;
- uint64_t invalid_framework_messages;
- } statistics;
+ uint64_t tasks[TaskState_ARRAYSIZE];
+ uint64_t validStatusUpdates;
+ uint64_t invalidStatusUpdates;
+ uint64_t validFrameworkMessages;
+ uint64_t invalidFrameworkMessages;
+ } stats;
// Start time used to calculate uptime.
double startTime;
@@ -266,49 +258,40 @@ private:
// A resource offer.
-struct SlotOffer
+struct Offer
{
- OfferID offerId;
- FrameworkID frameworkId;
- std::vector<SlaveResources> resources;
+ Offer(const OfferID& _id,
+ const FrameworkID& _frameworkId,
+ const std::vector<SlaveResources>& _resources)
+ : id(_id), frameworkId(_frameworkId), resources(_resources) {}
- SlotOffer(const OfferID& _offerId,
- const FrameworkID& _frameworkId,
- const std::vector<SlaveResources>& _resources)
- : offerId(_offerId), frameworkId(_frameworkId), resources(_resources) {}
+ const OfferID id;
+ const FrameworkID frameworkId;
+ std::vector<SlaveResources> resources;
};
// A connected slave.
struct Slave
{
- SlaveInfo info;
- SlaveID slaveId;
- process::UPID pid;
-
- bool active; // Turns false when slave is being removed
- double connectTime;
- double lastHeartbeat;
-
- Resources resourcesOffered; // Resources currently in offers
- Resources resourcesInUse; // Resources currently used by tasks
-
- boost::unordered_map<std::pair<FrameworkID, TaskID>, Task*> tasks;
- boost::unordered_set<SlotOffer*> slotOffers; // Active offers on this slave.
-
- SlaveObserver* observer;
-
- Slave(const SlaveInfo& _info, const SlaveID& _slaveId,
- const process::UPID& _pid, double time)
- : info(_info), slaveId(_slaveId), pid(_pid), active(true),
- connectTime(time), lastHeartbeat(time) {}
+ Slave(const SlaveInfo& _info,
+ const SlaveID& _id,
+ const UPID& _pid,
+ double time)
+ : info(_info),
+ id(_id),
+ pid(_pid),
+ active(true),
+ registeredTime(time),
+ lastHeartbeat(time) {}
~Slave() {}
- Task* lookupTask(const FrameworkID& frameworkId, const TaskID& taskId)
+ Task* getTask(const FrameworkID& frameworkId, const TaskID& taskId)
{
- foreachpair (_, Task* task, tasks) {
- if (task->framework_id() == frameworkId && task->task_id() == taskId) {
+ foreachvalue (Task* task, tasks) {
+ if (task->framework_id() == frameworkId &&
+ task->task_id() == taskId) {
return task;
}
}
@@ -346,6 +329,23 @@ struct Slave
}
return resources - (resourcesOffered + resourcesInUse);
}
+
+ const SlaveID id;
+ const SlaveInfo info;
+
+ UPID pid;
+
+ bool active; // Turns false when slave is being removed
+ double registeredTime;
+ double lastHeartbeat;
+
+ Resources resourcesOffered; // Resources currently in offers
+ Resources resourcesInUse; // Resources currently used by tasks
+
+ hashmap<std::pair<FrameworkID, TaskID>, Task*> tasks;
+ hashset<Offer*> offers; // Active offers on this slave.
+
+ SlaveObserver* observer;
};
@@ -360,71 +360,17 @@ struct SlaveResources
};
-class FrameworkFailoverTimer : public process::Process<FrameworkFailoverTimer>
-{
-public:
- FrameworkFailoverTimer(const process::PID<Master>& _master,
- const FrameworkID& _frameworkId)
- : master(_master), frameworkId(_frameworkId) {}
-
-protected:
- virtual void operator () ()
- {
- link(master);
- while (true) {
- receive(FRAMEWORK_FAILOVER_TIMEOUT);
- if (name() == process::TIMEOUT) {
- process::dispatch(master, &Master::frameworkExpired, frameworkId);
- return;
- } else if (name() == process::EXITED || name() == process::TERMINATE) {
- return;
- }
- }
- }
-
-private:
- const process::PID<Master> master;
- const FrameworkID frameworkId;
-};
-
-
// An connected framework.
struct Framework
{
- FrameworkInfo info;
- FrameworkID frameworkId;
- process::UPID pid;
-
- bool active; // Turns false when framework is being removed
- double connectTime;
-
- boost::unordered_map<TaskID, Task*> tasks;
- boost::unordered_set<SlotOffer*> slotOffers; // Active offers for framework.
-
- Resources resources; // Total resources owned by framework (tasks + offers)
-
- // Contains a time of unfiltering for each slave we've filtered,
- // or 0 for slaves that we want to keep filtered forever
- boost::unordered_map<Slave*, double> slaveFilter;
-
- // A failover timer if the connection to this framework is lost.
- FrameworkFailoverTimer* failoverTimer;
+ Framework(const FrameworkInfo& _info, const FrameworkID& _id,
+ const UPID& _pid, double time)
+ : info(_info), id(_id), pid(_pid), active(true),
+ registeredTime(time), reregisteredTime(time) {}
- Framework(const FrameworkInfo& _info, const FrameworkID& _frameworkId,
- const process::UPID& _pid, double time)
- : info(_info), frameworkId(_frameworkId), pid(_pid), active(true),
- connectTime(time), failoverTimer(NULL) {}
-
- ~Framework()
- {
- if (failoverTimer != NULL) {
- process::post(failoverTimer->self(), process::TERMINATE);
- process::wait(failoverTimer->self());
- delete failoverTimer;
- }
- }
+ ~Framework() {}
- Task* lookupTask(const TaskID& taskId)
+ Task* getTask(const TaskID& taskId)
{
if (tasks.count(taskId) > 0) {
return tasks[taskId];
@@ -452,19 +398,19 @@ struct Framework
tasks.erase(taskId);
}
- void addOffer(SlotOffer* offer)
+ void addOffer(Offer* offer)
{
- CHECK(slotOffers.count(offer) == 0);
- slotOffers.insert(offer);
+ CHECK(offers.count(offer) == 0);
+ offers.insert(offer);
foreach (const SlaveResources& sr, offer->resources) {
resources += sr.resources;
}
}
- void removeOffer(SlotOffer* offer)
+ void removeOffer(Offer* offer)
{
- CHECK(slotOffers.find(offer) != slotOffers.end());
- slotOffers.erase(offer);
+ CHECK(offers.find(offer) != offers.end());
+ offers.erase(offer);
foreach (const SlaveResources& sr, offer->resources) {
resources -= sr.resources;
}
@@ -478,34 +424,52 @@ struct Framework
void removeExpiredFilters(double now)
{
- foreachpaircopy (Slave* slave, double removalTime, slaveFilter) {
+ foreachpair (Slave* slave, double removalTime, utils::copy(slaveFilter)) {
if (removalTime != 0 && removalTime <= now) {
slaveFilter.erase(slave);
}
}
}
+
+ const FrameworkID id;
+ const FrameworkInfo info;
+
+ UPID pid;
+
+ bool active; // Turns false when framework is being removed
+ double registeredTime;
+ double reregisteredTime;
+
+ hashmap<TaskID, Task*> tasks;
+ hashset<Offer*> offers; // Active offers for framework.
+
+ Resources resources; // Total resources owned by framework (tasks + offers)
+
+ // Contains a time of unfiltering for each slave we've filtered,
+ // or 0 for slaves that we want to keep filtered forever
+ hashmap<Slave*, double> slaveFilter;
};
-// Pretty-printing of SlotOffers, Tasks, Frameworks, Slaves, etc.
+// Pretty-printing of Offers, Tasks, Frameworks, Slaves, etc.
-inline std::ostream& operator << (std::ostream& stream, const SlotOffer *o)
+inline std::ostream& operator << (std::ostream& stream, const Offer *o)
{
- stream << "offer " << o->offerId;
+ stream << "offer " << o->id;
return stream;
}
inline std::ostream& operator << (std::ostream& stream, const Slave *s)
{
- stream << "slave " << s->slaveId;
+ stream << "slave " << s->id;
return stream;
}
inline std::ostream& operator << (std::ostream& stream, const Framework *f)
{
- stream << "framework " << f->frameworkId;
+ stream << "framework " << f->id;
return stream;
}
Modified: incubator/mesos/trunk/src/master/simple_allocator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/simple_allocator.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/simple_allocator.cpp (original)
+++ incubator/mesos/trunk/src/master/simple_allocator.cpp Mon Jun 27 06:08:33 2011
@@ -56,7 +56,7 @@ void SimpleAllocator::taskRemoved(Task*
{
LOG(INFO) << "Removed " << task;
// Remove all refusers from this slave since it has more resources free
- Slave* slave = master->lookupSlave(task->slave_id());
+ Slave* slave = master->getSlave(task->slave_id());
CHECK(slave != 0);
refusers[slave].clear();
// Re-offer the resources, unless this task was removed due to a lost
@@ -66,7 +66,7 @@ void SimpleAllocator::taskRemoved(Task*
}
-void SimpleAllocator::offerReturned(SlotOffer* offer,
+void SimpleAllocator::offerReturned(Offer* offer,
OfferReturnReason reason,
const vector<SlaveResources>& resLeft)
{
@@ -74,7 +74,7 @@ void SimpleAllocator::offerReturned(Slot
// If this offer returned due to the framework replying, add it to refusers.
if (reason == ORR_FRAMEWORK_REPLIED) {
- Framework* framework = master->lookupFramework(offer->frameworkId);
+ Framework* framework = master->getFramework(offer->frameworkId);
CHECK(framework != 0);
foreach (const SlaveResources& r, resLeft) {
VLOG(1) << "Framework reply leaves " << r.resources.allocatable()
@@ -148,7 +148,7 @@ struct DominantShareComparator
if (share1 == share2)
// Make the sort deterministic for unit testing.
- return f1->frameworkId.value() < f2->frameworkId.value();
+ return f1->id.value() < f2->id.value();
else
return share1 < share2;
}
@@ -235,7 +235,7 @@ void SimpleAllocator::makeNewOffers(cons
}
// Clear refusers on any slave that has been refused by everyone
- foreachpair (Slave* slave, _, freeResources) {
+ foreachkey (Slave* slave, freeResources) {
unordered_set<Framework*>& refs = refusers[slave];
if (refs.size() == ordering.size()) {
VLOG(1) << "Clearing refusers for " << slave
@@ -251,7 +251,7 @@ void SimpleAllocator::makeNewOffers(cons
if (refusers[slave].find(framework) == refusers[slave].end() &&
!framework->filters(slave, resources)) {
VLOG(1) << "Offering " << resources << " on " << slave
- << " to framework " << framework->frameworkId;
+ << " to framework " << framework->id;
offerable.push_back(SlaveResources(slave, resources));
}
}
Modified: incubator/mesos/trunk/src/master/simple_allocator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/simple_allocator.hpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/simple_allocator.hpp (original)
+++ incubator/mesos/trunk/src/master/simple_allocator.hpp Mon Jun 27 06:08:33 2011
@@ -6,7 +6,7 @@
#include <boost/unordered_map.hpp>
#include <boost/unordered_set.hpp>
-#include "messaging/messages.pb.h"
+#include "messages/messages.pb.h"
#include "allocator.hpp"
@@ -38,7 +38,7 @@ public:
virtual void taskRemoved(Task* task, TaskRemovalReason reason);
- virtual void offerReturned(SlotOffer* offer,
+ virtual void offerReturned(Offer* offer,
OfferReturnReason reason,
const std::vector<SlaveResources>& resourcesLeft);
Modified: incubator/mesos/trunk/src/master/slaves_manager.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/slaves_manager.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/slaves_manager.cpp (original)
+++ incubator/mesos/trunk/src/master/slaves_manager.cpp Mon Jun 27 06:08:33 2011
@@ -1,9 +1,11 @@
+#include <glog/logging.h>
+
#include <map>
#include <sstream>
#include <boost/lexical_cast.hpp>
-#include <glog/logging.h>
+#include <process/dispatch.hpp>
#include "config/config.hpp"
@@ -568,7 +570,7 @@ bool ZooKeeperSlavesManagerStorage::pars
SlavesManager::SlavesManager(const Configuration& conf,
const PID<Master>& _master)
- : process::Process<SlavesManager>("slaves"),
+ : process::ProcessBase("slaves"),
master(_master)
{
// Create the slave manager storage based on configuration.
@@ -754,7 +756,7 @@ void SlavesManager::updateActive(const m
{
// Loop through the current active slave hostname:port pairs and
// remove all that are not found in updated.
- foreachpaircopy (const string& hostname, uint16_t port, active) {
+ foreachpair (const string& hostname, uint16_t port, utils::copy(active)) {
if (updated.count(hostname, port) == 0) {
process::dispatch(master, &Master::deactivatedSlaveHostnamePort,
hostname, port);
Modified: incubator/mesos/trunk/src/master/state.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/state.hpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/state.hpp (original)
+++ incubator/mesos/trunk/src/master/state.hpp Mon Jun 27 06:08:33 2011
@@ -19,24 +19,24 @@ namespace mesos { namespace internal { n
struct SlaveResources
{
std::string slave_id;
- int32_t cpus;
- int64_t mem;
+ double cpus;
+ double mem;
- SlaveResources(std::string _slaveId, int32_t _cpus, int64_t _mem)
+ SlaveResources(std::string _slaveId, double _cpus, double _mem)
: slave_id(_slaveId), cpus(_cpus), mem(_mem) {}
};
-struct SlotOffer
+struct Offer
{
std::string id;
std::string framework_id;
std::vector<SlaveResources *> resources;
- SlotOffer(std::string _id, std::string _frameworkId)
+ Offer(std::string _id, std::string _frameworkId)
: id(_id), framework_id(_frameworkId) {}
- ~SlotOffer()
+ ~Offer()
{
foreach (SlaveResources *sr, resources)
delete sr;
@@ -48,7 +48,7 @@ struct Slave
{
Slave(std::string id_, const std::string& host_,
const std::string& web_ui_url_,
- int32_t cpus_, int64_t mem_, time_t connect_)
+ double cpus_, double mem_, double connect_)
: id(id_), host(host_), web_ui_url(web_ui_url_),
cpus(cpus_), mem(mem_), connect_time(connect_) {}
@@ -57,16 +57,16 @@ struct Slave
std::string id;
std::string host;
std::string web_ui_url;
- int32_t cpus;
- int64_t mem;
- int64_t connect_time;
+ double cpus;
+ double mem;
+ double connect_time;
};
struct Task
{
Task(std::string id_, const std::string& name_, std::string framework_id_,
- std::string slaveId_, std::string state_, int32_t _cpus, int64_t _mem)
+ std::string slaveId_, std::string state_, double _cpus, double _mem)
: id(id_), name(name_), framework_id(framework_id_), slave_id(slaveId_),
state(state_), cpus(_cpus), mem(_mem) {}
@@ -77,8 +77,8 @@ struct Task
std::string framework_id;
std::string slave_id;
std::string state;
- int32_t cpus;
- int64_t mem;
+ double cpus;
+ double mem;
};
@@ -86,7 +86,7 @@ struct Framework
{
Framework(std::string id_, const std::string& user_,
const std::string& name_, const std::string& executor_,
- int32_t cpus_, int64_t mem_, time_t connect_)
+ double cpus_, double mem_, double connect_)
: id(id_), user(user_), name(name_), executor(executor_),
cpus(cpus_), mem(mem_), connect_time(connect_) {}
@@ -96,7 +96,7 @@ struct Framework
{
foreach (Task *task, tasks)
delete task;
- foreach (SlotOffer *offer, offers)
+ foreach (Offer *offer, offers)
delete offer;
}
@@ -104,12 +104,12 @@ struct Framework
std::string user;
std::string name;
std::string executor;
- int32_t cpus;
- int64_t mem;
- int64_t connect_time;
+ double cpus;
+ double mem;
+ double connect_time;
std::vector<Task *> tasks;
- std::vector<SlotOffer *> offers;
+ std::vector<Offer *> offers;
};
Modified: incubator/mesos/trunk/src/master/webui.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/webui.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/webui.cpp (original)
+++ incubator/mesos/trunk/src/master/webui.cpp Mon Jun 27 06:08:33 2011
@@ -3,6 +3,8 @@
#include <sstream>
#include <string>
+#include <process/dispatch.hpp>
+
#include "state.hpp"
#include "webui.hpp"
Added: incubator/mesos/trunk/src/messages/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages/messages.hpp?rev=1140024&view=auto
==============================================================================
--- incubator/mesos/trunk/src/messages/messages.hpp (added)
+++ incubator/mesos/trunk/src/messages/messages.hpp Mon Jun 27 06:08:33 2011
@@ -0,0 +1,6 @@
+#ifndef __MESSAGES_HPP__
+#define __MESSAGES_HPP__
+
+#include "messages/messages.pb.h"
+
+#endif // __MESSAGES_HPP__
Added: incubator/mesos/trunk/src/messages/messages.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages/messages.proto?rev=1140024&view=auto
==============================================================================
--- incubator/mesos/trunk/src/messages/messages.proto (added)
+++ incubator/mesos/trunk/src/messages/messages.proto Mon Jun 27 06:08:33 2011
@@ -0,0 +1,245 @@
+import "mesos.proto";
+
+package mesos.internal;
+
+// TODO(benh): Provide comments for each of these messages. Also,
+// consider splitting these messages into different "packages" which
+// represent which messages get handled by which components (e.g., the
+// "mesos.internal.executor" package includes messages that the
+// executor handles).
+
+
+// TODO(benh): It would be great if this could just be a
+// TaskDescription wherever it gets used! However, doing so would
+// require adding the framework_id field, the executor_id field, and
+// the state field into TaskDescription though (or send them another
+// way). Also, one performance reason why we don't do that now is
+// because storing whatever data is coupled with a TaskDescription
+// could be large and unnecessary.
+message Task {
+ required string name = 1;
+ required TaskID task_id = 2;
+ required FrameworkID framework_id = 3;
+ required ExecutorID executor_id = 4;
+ required SlaveID slave_id = 5;
+ required TaskState state = 6;
+ repeated Resource resources = 7;
+}
+
+
+message StatusUpdate {
+ required FrameworkID framework_id = 1;
+ optional ExecutorID executor_id = 2;
+ optional SlaveID slave_id = 3;
+ required TaskStatus status = 4;
+ required double timestamp = 5;
+ required bytes uuid = 6;
+}
+
+
+message ResourceOffer {
+ required SlaveInfo slave = 1;
+ repeated Resource resources = 2;
+}
+
+
+message ExecutorToFrameworkMessage {
+ required SlaveID slave_id = 1;
+ required FrameworkID framework_id = 2;
+ required ExecutorID executor_id = 3;
+ required bytes data = 4;
+}
+
+
+message FrameworkToExecutorMessage {
+ required SlaveID slave_id = 1;
+ required FrameworkID framework_id = 2;
+ required ExecutorID executor_id = 3;
+ required bytes data = 4;
+}
+
+
+message RegisterFrameworkMessage {
+ required FrameworkInfo framework = 1;
+}
+
+
+message ReregisterFrameworkMessage {
+ required FrameworkID framework_id = 1;
+ required FrameworkInfo framework = 2;
+ required int32 generation = 3;
+}
+
+
+message FrameworkRegisteredMessage {
+ required FrameworkID framework_id = 1;
+}
+
+
+message UnregisterFrameworkMessage {
+ required FrameworkID framework_id = 1;
+}
+
+
+message ResourceOfferMessage {
+ required OfferID offer_id = 1;
+ repeated SlaveOffer offers = 2;
+ repeated string pids = 3;
+}
+
+
+message ResourceOfferReplyMessage {
+ required FrameworkID framework_id = 1;
+ required OfferID offer_id = 2;
+ repeated TaskDescription tasks = 3;
+ optional Params params = 4;
+}
+
+
+message RescindResourceOfferMessage {
+ required OfferID offer_id = 1;
+}
+
+
+message ReviveOffersMessage {
+ required FrameworkID framework_id = 1;
+}
+
+
+message RunTaskMessage {
+ required FrameworkID framework_id = 1;
+ required FrameworkInfo framework = 2;
+ required string pid = 3;
+ required TaskDescription task = 4;
+}
+
+
+message KillTaskMessage {
+ required FrameworkID framework_id = 1;
+ required TaskID task_id = 2;
+}
+
+
+message StatusUpdateMessage {
+ required StatusUpdate update = 1;
+ optional string pid = 2;
+}
+
+
+message StatusUpdateAcknowledgementMessage {
+ required SlaveID slave_id = 1;
+ required FrameworkID framework_id = 2;
+ required TaskID task_id = 3;
+ required bytes uuid = 4;
+}
+
+
+message LostSlaveMessage {
+ required SlaveID slave_id = 1;
+}
+
+
+message FrameworkErrorMessage {
+ required int32 code = 1;
+ required string message = 2;
+}
+
+
+message RegisterSlaveMessage {
+ required SlaveInfo slave = 1;
+}
+
+
+message ReregisterSlaveMessage {
+ required SlaveID slave_id = 1;
+ required SlaveInfo slave = 2;
+ repeated Task tasks = 3;
+}
+
+
+message SlaveRegisteredMessage {
+ required SlaveID slave_id = 1;
+}
+
+
+message SlaveReregisteredMessage {
+ required SlaveID slave_id = 1;
+}
+
+
+message UnregisterSlaveMessage {
+ required SlaveID slave_id = 1;
+}
+
+
+message HeartbeatMessage {
+ required SlaveID slave_id = 1;
+}
+
+
+// Tells a slave to shut down all executors of the given framework.
+message ShutdownFrameworkMessage {
+ required FrameworkID framework_id = 1;
+}
+
+
+// Tells the executor to initiate a shut down by invoking
+// Executor::shutdown.
+message ShutdownExecutorMessage {}
+
+
+message UpdateFrameworkMessage {
+ required FrameworkID framework_id = 1;
+ required string pid = 2;
+}
+
+
+message RegisterExecutorMessage {
+ required FrameworkID framework_id = 1;
+ required ExecutorID executor_id = 2;
+}
+
+
+message ExecutorRegisteredMessage {
+ required ExecutorArgs args = 1;
+}
+
+
+message ExitedExecutorMessage {
+ required SlaveID slave_id = 1;
+ required FrameworkID framework_id = 2;
+ required ExecutorID executor_id = 3;
+ required int32 status = 4;
+}
+
+
+message RegisterProjdMessage {
+ required string project = 1;
+}
+
+
+message ProjdReadyMessage {
+ required string project = 1;
+}
+
+
+message ProjdUpdateResourcesMessage {
+ optional Params params = 1;
+}
+
+
+message FrameworkExpiredMessage {
+ required FrameworkID framework_id = 1;
+}
+
+
+message NoMasterDetectedMessage {}
+
+message NewMasterDetectedMessage {
+ required string pid = 2;
+}
+
+
+message GotMasterTokenMessage {
+ required string token = 1;
+}
Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Mon Jun 27 06:08:33 2011
@@ -8,8 +8,6 @@
#include <arpa/inet.h>
-#include <google/protobuf/descriptor.h>
-
#include <iostream>
#include <map>
#include <string>
@@ -19,14 +17,14 @@
#include <mesos/scheduler.hpp>
-#include <boost/unordered_map.hpp>
-#include <boost/unordered_set.hpp>
-
+#include <process/dispatch.hpp>
#include <process/process.hpp>
+#include <process/protobuf.hpp>
#include "configurator/configuration.hpp"
#include "common/fatal.hpp"
+#include "common/hashmap.hpp"
#include "common/lock.hpp"
#include "common/logging.hpp"
#include "common/type_utils.hpp"
@@ -35,24 +33,21 @@
#include "local/local.hpp"
-#include "messaging/messages.hpp"
+#include "messages/messages.hpp"
using namespace mesos;
using namespace mesos::internal;
-using boost::cref;
-using boost::unordered_map;
-using boost::unordered_set;
-
-using google::protobuf::RepeatedPtrField;
+using namespace process;
-using process::PID;
-using process::UPID;
+using boost::cref;
using std::map;
using std::string;
using std::vector;
+using process::wait; // Necessary on some OS's to disambiguate.
+
using std::tr1::bind;
@@ -64,81 +59,69 @@ namespace mesos { namespace internal {
// we allow friend functions to invoke 'send', 'post', etc. Therefore,
// we must make sure that any necessary synchronization is performed.
-class SchedulerProcess : public MesosProcess<SchedulerProcess>
+class SchedulerProcess : public ProtobufProcess<SchedulerProcess>
{
public:
- SchedulerProcess(MesosSchedulerDriver* _driver, Scheduler* _sched,
+ SchedulerProcess(MesosSchedulerDriver* _driver,
+ Scheduler* _sched,
const FrameworkID& _frameworkId,
const FrameworkInfo& _framework)
- : driver(_driver), sched(_sched), frameworkId(_frameworkId),
- framework(_framework), generation(0), master(UPID()), terminate(false)
- {
- install(NEW_MASTER_DETECTED, &SchedulerProcess::newMasterDetected,
- &NewMasterDetectedMessage::pid);
-
- install(NO_MASTER_DETECTED, &SchedulerProcess::noMasterDetected);
-
- install(M2F_REGISTER_REPLY, &SchedulerProcess::registerReply,
- &FrameworkRegisteredMessage::framework_id);
-
- install(M2F_RESOURCE_OFFER, &SchedulerProcess::resourceOffer,
- &ResourceOfferMessage::offer_id,
- &ResourceOfferMessage::offers,
- &ResourceOfferMessage::pids);
-
- install(M2F_RESCIND_OFFER, &SchedulerProcess::rescindOffer,
- &RescindResourceOfferMessage::offer_id);
-
- install(M2F_STATUS_UPDATE, &SchedulerProcess::statusUpdate,
- &StatusUpdateMessage::framework_id,
- &StatusUpdateMessage::status);
+ : driver(_driver),
+ sched(_sched),
+ frameworkId(_frameworkId),
+ framework(_framework),
+ generation(0),
+ master(UPID())
+ {
+ installProtobufHandler<NewMasterDetectedMessage>(
+ &SchedulerProcess::newMasterDetected,
+ &NewMasterDetectedMessage::pid);
+
+ installProtobufHandler<NoMasterDetectedMessage>(
+ &SchedulerProcess::noMasterDetected);
+
+ installProtobufHandler<FrameworkRegisteredMessage>(
+ &SchedulerProcess::registered,
+ &FrameworkRegisteredMessage::framework_id);
+
+ installProtobufHandler<ResourceOfferMessage>(
+ &SchedulerProcess::resourceOffer,
+ &ResourceOfferMessage::offer_id,
+ &ResourceOfferMessage::offers,
+ &ResourceOfferMessage::pids);
+
+ installProtobufHandler<RescindResourceOfferMessage>(
+ &SchedulerProcess::rescindOffer,
+ &RescindResourceOfferMessage::offer_id);
+
+ installProtobufHandler<StatusUpdateMessage>(
+ &SchedulerProcess::statusUpdate,
+ &StatusUpdateMessage::update,
+ &StatusUpdateMessage::pid);
+
+ installProtobufHandler<LostSlaveMessage>(
+ &SchedulerProcess::lostSlave,
+ &LostSlaveMessage::slave_id);
+
+ installProtobufHandler<ExecutorToFrameworkMessage>(
+ &SchedulerProcess::frameworkMessage,
+ &ExecutorToFrameworkMessage::slave_id,
+ &ExecutorToFrameworkMessage::framework_id,
+ &ExecutorToFrameworkMessage::executor_id,
+ &ExecutorToFrameworkMessage::data);
+
+ installProtobufHandler<FrameworkErrorMessage>(
+ &SchedulerProcess::error,
+ &FrameworkErrorMessage::code,
+ &FrameworkErrorMessage::message);
- install(M2F_LOST_SLAVE, &SchedulerProcess::lostSlave,
- &LostSlaveMessage::slave_id);
-
- install(M2F_FRAMEWORK_MESSAGE, &SchedulerProcess::frameworkMessage,
- &FrameworkMessageMessage::slave_id,
- &FrameworkMessageMessage::framework_id,
- &FrameworkMessageMessage::executor_id,
- &FrameworkMessageMessage::data);
-
- install(M2F_ERROR, &SchedulerProcess::error,
- &FrameworkErrorMessage::code,
- &FrameworkErrorMessage::message);
-
- install(process::EXITED, &SchedulerProcess::exited);
+ installMessageHandler(process::EXITED, &SchedulerProcess::exited);
}
virtual ~SchedulerProcess() {}
protected:
- virtual void operator () ()
- {
- while (true) {
- // Sending a message to terminate this process is insufficient
- // because that message might get queued behind a bunch of other
- // message. So, when it is time to terminate, we set a flag that
- // gets re-read by this process after every message. In order to
- // get this correct we must return from each invocation of
- // 'serve', to check and see if terminate has been set. In
- // addition, we need to send a dummy message right after we set
- // terminate just in case there aren't any messages in the
- // queue. Note that the terminate field is only read by this
- // process, so we don't need to protect it in any way. In fact,
- // using a lock to protect it (or for providing atomicity for
- // cleanup, for example), might lead to deadlock with the client
- // code because we already use a lock in SchedulerDriver. That
- // being said, for now we make terminate 'volatile' to guarantee
- // that each read is getting a fresh copy.
- // TODO(benh): Do a coherent read so as to avoid using
- // 'volatile'.
- if (terminate) return;
-
- serve(0, true);
- }
- }
-
- void newMasterDetected(const string& pid)
+ void newMasterDetected(const UPID& pid)
{
VLOG(1) << "New master at " << pid;
@@ -147,16 +130,16 @@ protected:
if (frameworkId == "") {
// Touched for the very first time.
- MSG<F2M_REGISTER_FRAMEWORK> out;
- out.mutable_framework()->MergeFrom(framework);
- send(master, out);
+ RegisterFrameworkMessage message;
+ message.mutable_framework()->MergeFrom(framework);
+ send(master, message);
} else {
// Not the first time, or failing over.
- MSG<F2M_REREGISTER_FRAMEWORK> out;
- out.mutable_framework()->MergeFrom(framework);
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.set_generation(generation++);
- send(master, out);
+ ReregisterFrameworkMessage message;
+ message.mutable_framework()->MergeFrom(framework);
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.set_generation(generation++);
+ send(master, message);
}
active = true;
@@ -170,12 +153,11 @@ protected:
active = false;
}
- void registerReply(const FrameworkID& frameworkId)
+ void registered(const FrameworkID& frameworkId)
{
VLOG(1) << "Framework registered with " << frameworkId;
this->frameworkId = frameworkId;
- process::invoke(bind(&Scheduler::registered, sched, driver,
- cref(frameworkId)));
+ invoke(bind(&Scheduler::registered, sched, driver, cref(frameworkId)));
}
void resourceOffer(const OfferID& offerId,
@@ -199,26 +181,26 @@ protected:
}
}
- process::invoke(bind(&Scheduler::resourceOffer, sched, driver,
- cref(offerId), cref(offers)));
+ invoke(bind(&Scheduler::resourceOffer, sched, driver, cref(offerId),
+ cref(offers)));
}
void rescindOffer(const OfferID& offerId)
{
VLOG(1) << "Rescinded offer " << offerId;
savedOffers.erase(offerId);
- process::invoke(bind(&Scheduler::offerRescinded, sched, driver,
- cref(offerId)));
+ invoke(bind(&Scheduler::offerRescinded, sched, driver, cref(offerId)));
}
- void statusUpdate(const FrameworkID& frameworkId, const TaskStatus& status)
+ void statusUpdate(const StatusUpdate& update, const UPID& pid)
{
+ const TaskStatus& status = update.status();
+
VLOG(1) << "Status update: task " << status.task_id()
- << " of framework " << frameworkId
- << " is now in state "
- << TaskState_descriptor()->FindValueByNumber(status.state())->name();
+ << " of framework " << update.framework_id()
+ << " is now in state " << status.state();
- CHECK(this->frameworkId == frameworkId);
+ CHECK(frameworkId == update.framework_id());
// TODO(benh): Note that this maybe a duplicate status update!
// Once we get support to try and have a more consistent view
@@ -229,25 +211,27 @@ protected:
// multiple times (of course, if a scheduler re-uses a TaskID,
// that could be bad.
- process::invoke(bind(&Scheduler::statusUpdate, sched, driver,
- cref(status)));
+ invoke(bind(&Scheduler::statusUpdate, sched, driver, cref(status)));
- // Acknowledge the message (we do this last, after we process::invoked
- // the scheduler, if we did at all, in case it causes a crash,
- // since this way the message might get resent/routed after
- // the scheduler comes back online).
- MSG<F2M_STATUS_UPDATE_ACK> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_slave_id()->MergeFrom(status.slave_id());
- out.mutable_task_id()->MergeFrom(status.task_id());
- send(master, out);
+ if (pid) {
+ // Acknowledge the message (we do this last, after we invoked
+ // the scheduler, if we did at all, in case it causes a crash,
+ // since this way the message might get resent/routed after the
+ // scheduler comes back online).
+ StatusUpdateAcknowledgementMessage message;
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_slave_id()->MergeFrom(update.slave_id());
+ message.mutable_task_id()->MergeFrom(status.task_id());
+ message.set_uuid(update.uuid());
+ send(pid, message);
+ }
}
void lostSlave(const SlaveID& slaveId)
{
VLOG(1) << "Lost slave " << slaveId;
savedSlavePids.erase(slaveId);
- process::invoke(bind(&Scheduler::slaveLost, sched, driver, cref(slaveId)));
+ invoke(bind(&Scheduler::slaveLost, sched, driver, cref(slaveId)));
}
void frameworkMessage(const SlaveID& slaveId,
@@ -256,15 +240,14 @@ protected:
const string& data)
{
VLOG(1) << "Received framework message";
- process::invoke(bind(&Scheduler::frameworkMessage, sched, driver,
- cref(slaveId), cref(executorId), cref(data)));
+ invoke(bind(&Scheduler::frameworkMessage, sched, driver, cref(slaveId),
+ cref(executorId), cref(data)));
}
void error(int32_t code, const string& message)
{
VLOG(1) << "Got error '" << message << "' (code: " << code << ")";
- process::invoke(bind(&Scheduler::error, sched, driver, code,
- cref(message)));
+ invoke(bind(&Scheduler::error, sched, driver, code, cref(message)));
}
void exited()
@@ -277,12 +260,16 @@ protected:
void stop()
{
+ // Whether or not we send an unregister message, we want to
+ // terminate this process ...
+ terminate(self());
+
if (!active)
return;
- MSG<F2M_UNREGISTER_FRAMEWORK> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- send(master, out);
+ UnregisterFrameworkMessage message;
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ send(master, message);
}
void killTask(const TaskID& taskId)
@@ -290,10 +277,10 @@ protected:
if (!active)
return;
- MSG<F2M_KILL_TASK> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_task_id()->MergeFrom(taskId);
- send(master, out);
+ KillTaskMessage message;
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_task_id()->MergeFrom(taskId);
+ send(master, message);
}
void replyToOffer(const OfferID& offerId,
@@ -303,12 +290,12 @@ protected:
if (!active)
return;
- MSG<F2M_RESOURCE_OFFER_REPLY> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_offer_id()->MergeFrom(offerId);
+ ResourceOfferReplyMessage message;
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_offer_id()->MergeFrom(offerId);
foreachpair (const string& key, const string& value, params) {
- Param* param = out.mutable_params()->add_param();
+ Param* param = message.mutable_params()->add_param();
param->set_key(key);
param->set_value(value);
}
@@ -318,13 +305,13 @@ protected:
// framework messages directly.
savedSlavePids[task.slave_id()] = savedOffers[offerId][task.slave_id()];
- out.add_tasks()->MergeFrom(task);
+ message.add_tasks()->MergeFrom(task);
}
// Remove the offer since we saved all the PIDs we might use.
savedOffers.erase(offerId);
- send(master, out);
+ send(master, message);
}
void reviveOffers()
@@ -332,9 +319,9 @@ protected:
if (!active)
return;
- MSG<F2M_REVIVE_OFFERS> out;
- out.mutable_framework_id()->MergeFrom(frameworkId);
- send(master, out);
+ ReviveOffersMessage message;
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ send(master, message);
}
void sendFrameworkMessage(const SlaveID& slaveId,
@@ -357,23 +344,22 @@ protected:
UPID slave = savedSlavePids[slaveId];
CHECK(slave != UPID());
- // TODO(benh): This is kind of wierd, M2S?
- MSG<M2S_FRAMEWORK_MESSAGE> out;
- out.mutable_slave_id()->MergeFrom(slaveId);
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_executor_id()->MergeFrom(executorId);
- out.set_data(data);
- send(slave, out);
+ FrameworkToExecutorMessage message;
+ message.mutable_slave_id()->MergeFrom(slaveId);
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_executor_id()->MergeFrom(executorId);
+ message.set_data(data);
+ send(slave, message);
} else {
VLOG(1) << "Cannot send directly to slave " << slaveId
<< "; sending through master";
- MSG<F2M_FRAMEWORK_MESSAGE> out;
- out.mutable_slave_id()->MergeFrom(slaveId);
- out.mutable_framework_id()->MergeFrom(frameworkId);
- out.mutable_executor_id()->MergeFrom(executorId);
- out.set_data(data);
- send(master, out);
+ FrameworkToExecutorMessage message;
+ message.mutable_slave_id()->MergeFrom(slaveId);
+ message.mutable_framework_id()->MergeFrom(frameworkId);
+ message.mutable_executor_id()->MergeFrom(executorId);
+ message.set_data(data);
+ send(master, message);
}
}
@@ -388,10 +374,9 @@ private:
UPID master;
volatile bool active;
- volatile bool terminate;
- unordered_map<OfferID, unordered_map<SlaveID, UPID> > savedOffers;
- unordered_map<SlaveID, UPID> savedSlavePids;
+ hashmap<OfferID, hashmap<SlaveID, UPID> > savedOffers;
+ hashmap<SlaveID, UPID> savedSlavePids;
};
}} // namespace mesos { namespace internal {
@@ -526,7 +511,7 @@ MesosSchedulerDriver::~MesosSchedulerDri
// to the user somehow. Note that we will also wait forever if
// MesosSchedulerDriver::stop was never called.
if (process != NULL) {
- process::wait(process->self());
+ wait(process);
delete process;
}
@@ -584,7 +569,7 @@ int MesosSchedulerDriver::start()
process = new SchedulerProcess(this, sched, frameworkId, framework);
- UPID pid = process::spawn(process);
+ UPID pid = spawn(process);
// Check and see if we need to launch a local cluster.
if (url == "local") {
@@ -616,9 +601,7 @@ int MesosSchedulerDriver::stop()
// getExecutorInfo which threw exceptions, or explicitely called
// stop. See above in start).
if (process != NULL) {
- process::dispatch(process->self(), &SchedulerProcess::stop);
- process->terminate = true;
- process::post(process->self(), process::TERMINATE);
+ dispatch(process, &SchedulerProcess::stop);
}
running = false;
@@ -661,8 +644,7 @@ int MesosSchedulerDriver::killTask(const
return -1;
}
- process::dispatch(process->self(), &SchedulerProcess::killTask,
- taskId);
+ dispatch(process, &SchedulerProcess::killTask, taskId);
return 0;
}
@@ -678,8 +660,7 @@ int MesosSchedulerDriver::replyToOffer(c
return -1;
}
- process::dispatch(process->self(), &SchedulerProcess::replyToOffer,
- offerId, tasks, params);
+ dispatch(process, &SchedulerProcess::replyToOffer, offerId, tasks, params);
return 0;
}
@@ -693,7 +674,7 @@ int MesosSchedulerDriver::reviveOffers()
return -1;
}
- process::dispatch(process->self(), &SchedulerProcess::reviveOffers);
+ dispatch(process, &SchedulerProcess::reviveOffers);
return 0;
}
@@ -709,8 +690,8 @@ int MesosSchedulerDriver::sendFrameworkM
return -1;
}
- process::dispatch(process->self(), &SchedulerProcess::sendFrameworkMessage,
- slaveId, executorId, data);
+ dispatch(process, &SchedulerProcess::sendFrameworkMessage,
+ slaveId, executorId, data);
return 0;
}
Modified: incubator/mesos/trunk/src/slave/isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/isolation_module.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/isolation_module.cpp Mon Jun 27 06:08:33 2011
@@ -6,12 +6,10 @@
#include "lxc_isolation_module.hpp"
#endif
-using std::string;
-using namespace mesos::internal::slave;
+namespace mesos { namespace internal { namespace slave {
-
-IsolationModule * IsolationModule::create(const string &type)
+IsolationModule* IsolationModule::create(const std::string &type)
{
if (type == "process")
return new ProcessBasedIsolationModule();
@@ -27,8 +25,11 @@ IsolationModule * IsolationModule::creat
}
-void IsolationModule::destroy(IsolationModule *module)
+void IsolationModule::destroy(IsolationModule* module)
{
- if (module != NULL)
+ if (module != NULL) {
delete module;
+ }
}
+
+}}} // namespace mesos { namespace internal { namespace slave {
Modified: incubator/mesos/trunk/src/slave/isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/isolation_module.hpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/isolation_module.hpp Mon Jun 27 06:08:33 2011
@@ -3,36 +3,51 @@
#include <string>
+#include <mesos/mesos.hpp>
+
+#include <process/process.hpp>
+
+#include "configurator/configuration.hpp"
+
+#include "common/resources.hpp"
+
namespace mesos { namespace internal { namespace slave {
class Slave;
-class Framework;
-class Executor;
-class IsolationModule {
+class IsolationModule : public process::Process<IsolationModule>
+{
public:
- static IsolationModule * create(const std::string &type);
- static void destroy(IsolationModule *module);
+ static IsolationModule* create(const std::string& type);
+ static void destroy(IsolationModule* module);
virtual ~IsolationModule() {}
// Called during slave initialization.
- virtual void initialize(Slave *slave) {}
+ virtual void initialize(const Configuration& conf,
+ bool local,
+ const process::PID<Slave>& slave) = 0;
// Called by the slave to launch an executor for a given framework.
- virtual void launchExecutor(Framework* framework, Executor* executor) = 0;
+ virtual void launchExecutor(const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory) = 0;
// Terminate a framework's executor, if it is still running.
// The executor is expected to be gone after this method exits.
- virtual void killExecutor(Framework* framework, Executor* executor) = 0;
+ virtual void killExecutor(const FrameworkID& frameworkId,
+ const ExecutorID& executorId) = 0;
// Update the resource limits for a given framework. This method will
// be called only after an executor for the framework is started.
- virtual void resourcesChanged(Framework *framework, Executor* executor) {}
+ virtual void resourcesChanged(const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const Resources& resources) = 0;
};
-}}}
+}}} // namespace mesos { namespace internal { namespace slave {
-#endif /* __ISOLATION_MODULE_HPP__ */
+#endif // __ISOLATION_MODULE_HPP__
Modified: incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp Mon Jun 27 06:08:33 2011
@@ -1,277 +1,342 @@
-#include <stdlib.h>
-#include <unistd.h>
-
#include <algorithm>
+#include <sstream>
+#include <map>
+
+#include <process/dispatch.hpp>
#include "lxc_isolation_module.hpp"
#include "common/foreach.hpp"
+#include "common/type_utils.hpp"
+#include "common/units.hpp"
+#include "common/utils.hpp"
#include "launcher/launcher.hpp"
-using std::cerr;
-using std::cout;
-using std::endl;
-using std::list;
-using std::make_pair;
-using std::max;
-using std::ostringstream;
-using std::pair;
-using std::queue;
-using std::string;
-using std::vector;
-
-using boost::lexical_cast;
-using boost::unordered_map;
-using boost::unordered_set;
-
using namespace mesos;
using namespace mesos::internal;
-using namespace mesos::internal::launcher;
using namespace mesos::internal::slave;
+using namespace process;
+
+using launcher::ExecutorLauncher;
+
+using process::wait; // Necessary on some OS's to disambiguate.
+
+using std::map;
+using std::max;
+using std::string;
+
+
namespace {
const int32_t CPU_SHARES_PER_CPU = 1024;
const int32_t MIN_CPU_SHARES = 10;
-const int64_t MIN_RSS = 128 * Megabyte;
+const int64_t MIN_RSS_MB = 128 * Megabyte;
+
+// TODO(benh): Factor this out into common/utils or possibly into
+// libprocess so that it can handle blocking.
+// Run a shell command formatted with varargs and return its exit code.
+int shell(const char* format, ...)
+{
+ char* cmd;
+ FILE* f;
+ int ret;
+ va_list args;
+ va_start(args, format);
+ if (vasprintf(&cmd, format, args) == -1)
+ return -1;
+ if ((f = popen(cmd, "w")) == NULL)
+ return -1;
+ ret = pclose(f);
+ if (ret == -1)
+ LOG(INFO) << "pclose error: " << strerror(errno);
+ free(cmd);
+ va_end(args);
+ return ret;
}
+// Attempt to set a resource limit of a container for a given cgroup
+// property (e.g. cpu.shares). Returns true on success.
+bool setResourceLimit(const string& container,
+ const string& property,
+ int64_t value)
+{
+ LOG(INFO) << "Setting " << property
+ << " for container " << container
+ << " to " << value;
+
+ int ret = shell("lxc-cgroup -n %s %s %lld",
+ container.c_str(),
+ property.c_str(),
+ value);
+ if (ret != 0) {
+ LOG(ERROR) << "Failed to set " << property
+ << " for container " << container
+ << ": lxc-cgroup returned " << ret;
+ return false;
+ }
+
+ return true;
+}
+
+} // namespace {
+
+
LxcIsolationModule::LxcIsolationModule()
- : initialized(false) {}
+ : initialized(false)
+{
+ // Spawn the reaper, note that it might send us a message before we
+ // actually get spawned ourselves, but that's okay, the message will
+ // just get dropped.
+ reaper = new Reaper();
+ spawn(reaper);
+ dispatch(reaper, &Reaper::addProcessExitedListener, this);
+}
LxcIsolationModule::~LxcIsolationModule()
{
- // We want to wait until the reaper has completed because it
- // accesses 'this' in order to make callbacks ... deleting 'this'
- // could thus lead to a seg fault!
- if (initialized) {
- CHECK(reaper != NULL);
- process::post(reaper->self(), process::TERMINATE);
- process::wait(reaper->self());
- delete reaper;
- }
+ CHECK(reaper != NULL);
+ terminate(reaper);
+ wait(reaper);
+ delete reaper;
}
-void LxcIsolationModule::initialize(Slave* slave)
+void LxcIsolationModule::initialize(
+ const Configuration& _conf,
+ bool _local,
+ const PID<Slave>& _slave)
{
- this->slave = slave;
+ conf = _conf;
+ local = _local;
+ slave = _slave;
- // Run a basic check to see whether Linux Container tools are available
+ // Check if Linux Container tools are available.
if (system("lxc-version > /dev/null") != 0) {
LOG(FATAL) << "Could not run lxc-version; make sure Linux Container "
<< "tools are installed";
}
// Check that we are root (it might also be possible to create Linux
- // containers without being root, but we can support that later)
+ // containers without being root, but we can support that later).
if (getuid() != 0) {
LOG(FATAL) << "LXC isolation module requires slave to run as root";
}
- reaper = new Reaper(this);
- process::spawn(reaper);
initialized = true;
}
-void LxcIsolationModule::launchExecutor(Framework* framework, Executor* executor)
+void LxcIsolationModule::launchExecutor(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const ExecutorInfo& executorInfo,
+ const string& directory)
{
if (!initialized) {
LOG(FATAL) << "Cannot launch executors before initialization!";
}
- infos[framework->frameworkId][executor->info.executor_id()] = new FrameworkInfo();
+ const ExecutorID& executorId = executorInfo.executor_id();
+
+ LOG(INFO) << "Launching '" << executorInfo.uri()
+ << "' for executor '" << executorId
+ << "' of framework " << frameworkId;
- LOG(INFO) << "Starting executor for framework " << framework->frameworkId << ": "
- << executor->info.uri();
+ // Create a name for the container.
+ std::ostringstream out;
+ out << "mesos.executor-" << executorId
+ << ".framework-" << frameworkId;
- // Get location of Mesos install in order to find mesos-launcher.
- string mesosHome = slave->getConfiguration().get("home", ".");
- string mesosLauncher = mesosHome + "/mesos-launcher";
-
- // Create a name for the container
- ostringstream oss;
- oss << "mesos.slave-" << slave->slaveId
- << ".framework-" << framework->frameworkId;
- string containerName = oss.str();
+ const string& container = out.str();
- infos[framework->frameworkId][executor->info.executor_id()]->container = containerName;
- executor->executorStatus = "Container: " + containerName;
+ ContainerInfo* info = new ContainerInfo();
+
+ info->frameworkId = frameworkId;
+ info->executorId = executorId;
+ info->container = container;
+ info->pid = -1;
+
+ infos[frameworkId][executorId] = info;
// Run lxc-execute mesos-launcher using a fork-exec (since lxc-execute
// does not return until the container is finished). Note that lxc-execute
// automatically creates the container and will delete it when finished.
pid_t pid;
- if ((pid = fork()) == -1)
+ if ((pid = fork()) == -1) {
PLOG(FATAL) << "Failed to fork to launch lxc-execute";
+ }
if (pid) {
- // In parent process
- infos[framework->frameworkId][executor->info.executor_id()]->lxcExecutePid = pid;
- LOG(INFO) << "Started child for lxc-execute, pid = " << pid;
- int status;
- } else {
- // Create an ExecutorLauncher to set up the environment for executing
- // an extrernal launcher_main.cpp process (inside of lxc-execute).
+ // In parent process.
+ info->pid = pid;
- const Configuration& conf = slave->getConfiguration();
+ // Tell the slave this executor has started.
+ dispatch(slave, &Slave::executorStarted,
+ frameworkId, executorId, pid);
+ } else {
+ // Close unnecessary file descriptors. Note that we are assuming
+ // stdin, stdout, and stderr can ONLY be found at the POSIX
+ // specified file numbers (0, 1, 2).
+ foreach (const string& entry, utils::os::listdir("/proc/self/fd")) {
+ if (entry != "." && entry != "..") {
+ try {
+ int fd = boost::lexical_cast<int>(entry);
+ if (fd != STDIN_FILENO &&
+ fd != STDOUT_FILENO &&
+ fd != STDERR_FILENO) {
+ close(fd);
+ }
+ } catch (boost::bad_lexical_cast&) {
+ LOG(FATAL) << "Failed to close file descriptors";
+ }
+ }
+ }
+ // Create an ExecutorLauncher to set up the environment for executing
+ // an external launcher_main.cpp process (inside of lxc-execute).
map<string, string> params;
- for (int i = 0; i < framework->info.executor().params().param_size(); i++) {
- params[framework->info.executor().params().param(i).key()] =
- framework->info.executor().params().param(i).value();
+ for (int i = 0; i < executorInfo.params().param_size(); i++) {
+ params[executorInfo.params().param(i).key()] =
+ executorInfo.params().param(i).value();
}
- ExecutorLauncher* launcher;
- launcher =
- new ExecutorLauncher(framework->frameworkId,
- executor->info.executor_id(),
- executor->info.uri(),
- framework->info.user(),
- slave->getUniqueWorkDirectory(framework->frameworkId,
- executor->info.executor_id()),
- slave->self(),
+ ExecutorLauncher* launcher =
+ new ExecutorLauncher(frameworkId,
+ executorId,
+ executorInfo.uri(),
+ frameworkInfo.user(),
+ directory,
+ slave,
conf.get("frameworks_home", ""),
conf.get("home", ""),
conf.get("hadoop_home", ""),
- !slave->local,
+ !local,
conf.get("switch_user", true),
+ container,
params);
+
launcher->setupEnvironmentForLauncherMain();
+
+ // Get location of Mesos install in order to find mesos-launcher.
+ string mesosLauncher = conf.get("home", ".") + "/bin/mesos-launcher";
// Run lxc-execute.
- execlp("lxc-execute", "lxc-execute", "-n", containerName.c_str(),
+ execlp("lxc-execute", "lxc-execute", "-n", container.c_str(),
mesosLauncher.c_str(), (char *) NULL);
- // If we get here, the execl call failed.
- fatalerror("Could not exec lxc-execute");
- // TODO: Exit the slave if this happens
+
+ // If we get here, the execlp call failed.
+ LOG(FATAL) << "Could not exec lxc-execute";
}
}
-void LxcIsolationModule::killExecutor(Framework* framework, Executor* executor)
+void LxcIsolationModule::killExecutor(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
{
- string container = infos[framework->frameworkId][executor->info.executor_id()]->container;
- if (container != "") {
- LOG(INFO) << "Stopping container " << container;
- int ret = shell("lxc-stop -n %s", container.c_str());
- if (ret != 0)
- LOG(ERROR) << "lxc-stop returned " << ret;
- infos[framework->frameworkId][executor->info.executor_id()]->container = "";
- executor->executorStatus = "No executor running";
- delete infos[framework->frameworkId][executor->info.executor_id()];
- infos[framework->frameworkId].erase(executor->info.executor_id());
+ if (!infos.contains(frameworkId) ||
+ !infos[frameworkId].contains(executorId)) {
+ LOG(ERROR) << "ERROR! Asked to kill an unknown executor!";
+ return;
}
-}
-
-void LxcIsolationModule::resourcesChanged(Framework* framework, Executor* executor)
-{
- if (infos[framework->frameworkId][executor->info.executor_id()]->container != "") {
- // For now, just try setting the CPUs and memory right away, and kill the
- // framework if this fails.
- // A smarter thing to do might be to only update them periodically in a
- // separate thread, and to give frameworks some time to scale down their
- // memory usage.
-
- double cpu = executor->resources.getScalar("cpu", Resource::Scalar()).value();
- int32_t cpuShares = max(CPU_SHARES_PER_CPU * (int32_t) cpu, MIN_CPU_SHARES);
- if (!setResourceLimit(framework, executor, "cpu.shares", cpuShares)) {
- // Tell slave to kill framework, which will invoke killExecutor.
- slave->killFramework(framework);
- return;
- }
+ ContainerInfo* info = infos[frameworkId][executorId];
- double mem = executor->resources.getScalar("mem", Resource::Scalar()).value();
- int64_t rssLimit = max((int64_t) mem, MIN_RSS) * 1024LL * 1024LL;
- if (!setResourceLimit(framework, executor, "memory.limit_in_bytes", rssLimit)) {
- // Tell slave to kill framework, which will invoke killExecutor.
- slave->killFramework(framework);
- return;
- }
- }
-}
+ CHECK(info->container != "");
+ LOG(INFO) << "Stopping container " << info->container;
-bool LxcIsolationModule::setResourceLimit(Framework* framework,
- Executor* executor,
- const string& property,
- int64_t value)
-{
- LOG(INFO) << "Setting " << property << " for framework " << framework->frameworkId
- << " to " << value;
- int ret = shell("lxc-cgroup -n %s %s %lld",
- infos[framework->frameworkId][executor->info.executor_id()]->container.c_str(),
- property.c_str(),
- value);
+ int ret = shell("lxc-stop -n %s", info->container.c_str());
if (ret != 0) {
- LOG(ERROR) << "Failed to set " << property << " for framework " << framework->frameworkId
- << ": lxc-cgroup returned " << ret;
- return false;
+ LOG(ERROR) << "lxc-stop returned " << ret;
+ }
+
+ if (infos[frameworkId].size() == 1) {
+ infos.erase(frameworkId);
} else {
- return true;
+ infos[frameworkId].erase(executorId);
}
+
+ delete info;
+
+ // NOTE: Both frameworkId and executorId are no longer valid because
+ // they have just been deleted above!
}
-int LxcIsolationModule::shell(const char* fmt, ...)
+void LxcIsolationModule::resourcesChanged(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const Resources& resources)
{
- char *cmd;
- FILE *f;
- int ret;
- va_list args;
- va_start(args, fmt);
- if (vasprintf(&cmd, fmt, args) == -1)
- return -1;
- if ((f = popen(cmd, "w")) == NULL)
- return -1;
- ret = pclose(f);
- if (ret == -1)
- LOG(INFO) << "pclose error: " << strerror(errno);
- free(cmd);
- va_end(args);
- return ret;
-}
+ if (!infos.contains(frameworkId) ||
+ !infos[frameworkId].contains(executorId)) {
+ LOG(ERROR) << "ERROR! Asked to update resources for an unknown executor!";
+ return;
+ }
+ ContainerInfo* info = infos[frameworkId][executorId];
-LxcIsolationModule::Reaper::Reaper(LxcIsolationModule* m)
- : module(m)
-{}
+ CHECK(info->container != "");
-
-void LxcIsolationModule::Reaper::operator () ()
+ const string& container = info->container;
+
+ // For now, just try setting the CPUs and memory right away, and kill the
+ // framework if this fails (needs to be fixed).
+ // A smarter thing to do might be to only update them periodically in a
+ // separate thread, and to give frameworks some time to scale down their
+ // memory usage.
+ string property;
+ uint64_t value;
+
+ double cpu = resources.getScalar("cpu", Resource::Scalar()).value();
+ int32_t cpuShares = max(CPU_SHARES_PER_CPU * (int32_t) cpu, MIN_CPU_SHARES);
+
+ property = "cpu.shares";
+ value = cpuShares;
+
+ if (!setResourceLimit(container, property, value)) {
+ // TODO(benh): Kill the executor, but do it in such a way that
+ // the slave finds out about it exiting.
+ return;
+ }
+
+ double mem = resources.getScalar("mem", Resource::Scalar()).value();
+ int64_t rssLimit = max((int64_t) mem, MIN_RSS_MB) * 1024LL * 1024LL;
+
+ property = "memory.limit_in_bytes";
+ value = rssLimit;
+
+ if (!setResourceLimit(container, property, value)) {
+ // TODO(benh): Kill the executor, but do it in such a way that
+ // the slave finds out about it exiting.
+ return;
+ }
+}
+
+
+void LxcIsolationModule::processExited(pid_t pid, int status)
{
- link(module->slave->self());
- while (true) {
- receive(1);
- if (name() == process::TIMEOUT) {
- // Check whether any child process has exited
- pid_t pid;
- int status;
- if ((pid = waitpid((pid_t) -1, &status, WNOHANG)) > 0) {
- foreachpair (const FrameworkID& frameworkId, _, module->infos) {
- foreachpair (const ExecutorID& executorId, FrameworkInfo* info, module->infos[frameworkId]) {
- if (info->lxcExecutePid == pid) {
- info->lxcExecutePid = -1;
- info->container = "";
- LOG(INFO) << "Telling slave of lost framework " << frameworkId;
- // TODO(benh): This is broken if/when libprocess is parallel!
- module->slave->executorExited(frameworkId, executorId, status);
- delete module->infos[frameworkId][executorId];
- module->infos[frameworkId].erase(executorId);
- break;
- }
- }
- }
+ foreachkey (const FrameworkID& frameworkId, infos) {
+ foreachvalue (ContainerInfo* info, infos[frameworkId]) {
+ if (info->pid == pid) {
+ LOG(INFO) << "Telling slave of lost executor "
+ << info->executorId
+ << " of framework " << info->frameworkId;
+
+ dispatch(slave, &Slave::executorExited,
+ info->frameworkId, info->executorId, status);
+
+ // Try and cleanup after the executor.
+ killExecutor(info->frameworkId, info->executorId);
+ return;
}
- } else if (name() == process::TERMINATE || name() == process::EXITED) {
- return;
}
}
}
Modified: incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp (original)
+++ incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp Mon Jun 27 06:08:33 2011
@@ -3,66 +3,64 @@
#include <string>
-#include <boost/unordered_map.hpp>
-
#include "isolation_module.hpp"
+#include "reaper.hpp"
#include "slave.hpp"
-#include "messaging/messages.hpp"
+#include "common/hashmap.hpp"
namespace mesos { namespace internal { namespace slave {
-using std::string;
-using boost::unordered_map;
-
-class LxcIsolationModule : public IsolationModule {
+class LxcIsolationModule
+ : public IsolationModule, public ProcessExitedListener
+{
public:
LxcIsolationModule();
virtual ~LxcIsolationModule();
- virtual void initialize(Slave* slave);
-
- virtual void launchExecutor(Framework* framework, Executor* executor);
-
- virtual void killExecutor(Framework* framework, Executor* executor);
-
- virtual void resourcesChanged(Framework* framework, Executor* executor);
+ virtual void initialize(const Configuration& conf,
+ bool local,
+ const process::PID<Slave>& slave);
+
+ virtual void launchExecutor(const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const ExecutorInfo& executorInfo,
+ const std::string& directory);
+
+ virtual void killExecutor(const FrameworkID& frameworkId,
+ const ExecutorID& executorId);
+
+ virtual void resourcesChanged(const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const Resources& resources);
-protected:
- // Run a shell command formatted with varargs and return its exit code.
- int shell(const char* format, ...);
-
- // Attempt to set a resource limit of a framework's container for a given
- // cgroup property (e.g. cpu.shares). Returns true on success.
- bool setResourceLimit(Framework* framework, Executor* executor,
- const string& property, int64_t value);
+ virtual void processExited(pid_t pid, int status);
private:
- // Reaps framework containers and tells the slave if they exit
- class Reaper : public process::Process<Reaper> {
- LxcIsolationModule* module;
-
- protected:
- virtual void operator () ();
-
- public:
- Reaper(LxcIsolationModule* module);
- };
-
- // Per-framework information object maintained in info hashmap
- struct FrameworkInfo {
- string container; // Name of Linux container used for this framework
- pid_t lxcExecutePid; // PID of lxc-execute command running the executor
+ // No copying, no assigning.
+ LxcIsolationModule(const LxcIsolationModule&);
+ LxcIsolationModule& operator = (const LxcIsolationModule&);
+
+ // Per-framework information object maintained in info hashmap.
+ struct ContainerInfo
+ {
+ FrameworkID frameworkId;
+ ExecutorID executorId;
+ std::string container; // Name of Linux container used for this framework.
+ pid_t pid; // PID of lxc-execute command running the executor.
};
+ // TODO(benh): Make variables const by passing them via constructor.
+ Configuration conf;
+ bool local;
+ process::PID<Slave> slave;
bool initialized;
- Slave* slave;
- boost::unordered_map<FrameworkID, boost::unordered_map<ExecutorID, FrameworkInfo*> > infos;
Reaper* reaper;
+ hashmap<FrameworkID, hashmap<ExecutorID, ContainerInfo*> > infos;
};
-}}}
+}}} // namespace mesos { namespace internal { namespace slave {
-#endif /* __LXC_ISOLATION_MODULE_HPP__ */
+#endif // __LXC_ISOLATION_MODULE_HPP__
Modified: incubator/mesos/trunk/src/slave/main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/main.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/main.cpp (original)
+++ incubator/mesos/trunk/src/slave/main.cpp Mon Jun 27 06:08:33 2011
@@ -1,3 +1,5 @@
+#include <libgen.h>
+
#include "common/build.hpp"
#include "common/logging.hpp"
@@ -12,9 +14,6 @@
using namespace mesos::internal;
using namespace mesos::internal::slave;
-using boost::bad_lexical_cast;
-using boost::lexical_cast;
-
using std::cerr;
using std::endl;
using std::string;
Modified: incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp?rev=1140024&r1=1140023&r2=1140024&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp Mon Jun 27 06:08:33 2011
@@ -1,179 +1,197 @@
+#include <signal.h>
+
#include <map>
-#include <vector>
+
+#include <process/dispatch.hpp>
#include "process_based_isolation_module.hpp"
#include "common/foreach.hpp"
+#include "common/type_utils.hpp"
using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::slave;
-using boost::lexical_cast;
-using boost::unordered_map;
-using boost::unordered_set;
+using namespace process;
using launcher::ExecutorLauncher;
-using std::cerr;
-using std::cout;
-using std::endl;
-using std::list;
-using std::make_pair;
using std::map;
-using std::ostringstream;
-using std::pair;
-using std::queue;
using std::string;
-using std::vector;
+
+using process::wait; // Necessary on some OS's to disambiguate.
ProcessBasedIsolationModule::ProcessBasedIsolationModule()
- : initialized(false) {}
+ : initialized(false)
+{
+ // Spawn the reaper, note that it might send us a message before we
+ // actually get spawned ourselves, but that's okay, the message will
+ // just get dropped.
+ reaper = new Reaper();
+ spawn(reaper);
+ dispatch(reaper, &Reaper::addProcessExitedListener, this);
+}
ProcessBasedIsolationModule::~ProcessBasedIsolationModule()
{
- // We need to wait until the reaper has completed because it
- // accesses 'this' in order to make callbacks ... deleting 'this'
- // could thus lead to a seg fault!
- if (initialized) {
- CHECK(reaper != NULL);
- process::post(reaper->self(), process::TERMINATE);
- process::wait(reaper->self());
- delete reaper;
- }
+ CHECK(reaper != NULL);
+ terminate(reaper);
+ wait(reaper);
+ delete reaper;
}
-void ProcessBasedIsolationModule::initialize(Slave *slave)
+void ProcessBasedIsolationModule::initialize(
+ const Configuration& _conf,
+ bool _local,
+ const PID<Slave>& _slave)
{
- this->slave = slave;
- reaper = new Reaper(this);
- process::spawn(reaper);
+ conf = _conf;
+ local = _local;
+ slave = _slave;
+
initialized = true;
}
-void ProcessBasedIsolationModule::launchExecutor(Framework* framework, Executor* executor)
+void ProcessBasedIsolationModule::launchExecutor(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const ExecutorInfo& executorInfo,
+ const string& directory)
{
- if (!initialized)
+ if (!initialized) {
LOG(FATAL) << "Cannot launch executors before initialization!";
+ }
- LOG(INFO) << "Starting executor for framework " << framework->frameworkId
- << ": " << executor->info.uri();
+ LOG(INFO) << "Launching '" << executorInfo.uri()
+ << "' for executor '" << executorInfo.executor_id()
+ << "' of framework " << frameworkId;
pid_t pid;
- if ((pid = fork()) == -1)
+ if ((pid = fork()) == -1) {
PLOG(FATAL) << "Failed to fork to launch new executor";
+ }
if (pid) {
// In parent process, record the pgid for killpg later.
LOG(INFO) << "Started executor, OS pid = " << pid;
- pgids[framework->frameworkId][executor->info.executor_id()] = pid;
- executor->executorStatus = "PID: " + lexical_cast<string>(pid);
+ pgids[frameworkId][executorInfo.executor_id()] = pid;
+
+ // Tell the slave this executor has started.
+ dispatch(slave, &Slave::executorStarted,
+ frameworkId, executorInfo.executor_id(), pid);
} else {
// In child process, make cleanup easier.
-// if (setpgid(0, 0) < 0)
-// PLOG(FATAL) << "Failed to put executor in own process group";
- if ((pid = setsid()) == -1)
+ if ((pid = setsid()) == -1) {
PLOG(FATAL) << "Failed to put executor in own session";
-
- createExecutorLauncher(framework, executor)->run();
+ }
+
+ ExecutorLauncher* launcher =
+ createExecutorLauncher(frameworkId, frameworkInfo,
+ executorInfo, directory);
+
+ launcher->run();
}
}
-void ProcessBasedIsolationModule::killExecutor(Framework* framework, Executor* executor)
-{
- if (pgids[framework->frameworkId][executor->info.executor_id()] != -1) {
+void ProcessBasedIsolationModule::killExecutor(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId)
+{
+ if (!pgids.contains(frameworkId) ||
+ !pgids[frameworkId].contains(executorId)) {
+ LOG(ERROR) << "ERROR! Asked to kill an unknown executor!";
+ return;
+ }
+
+ if (pgids[frameworkId][executorId] != -1) {
// TODO(benh): Consider sending a SIGTERM, then after so much time
// if it still hasn't exited do a SIGKILL (can use a libprocess
- // process for this).
- LOG(INFO) << "Sending SIGKILL to gpid "
- << pgids[framework->frameworkId][executor->info.executor_id()];
- killpg(pgids[framework->frameworkId][executor->info.executor_id()], SIGKILL);
- pgids[framework->frameworkId][executor->info.executor_id()] = -1;
- executor->executorStatus = "No executor running";
+ // process for this). This might not be necessary because we have
+ // higher-level semantics via the first shut down phase that gets
+ // initiated by the slave.
+ LOG(INFO) << "Sending SIGKILL to process group "
+ << pgids[frameworkId][executorId];
+
+ killpg(pgids[frameworkId][executorId], SIGKILL);
+
+ if (pgids[frameworkId].size() == 1) {
+ pgids.erase(frameworkId);
+ } else {
+ pgids[frameworkId].erase(executorId);
+ }
+
+ // NOTE: Both frameworkId and executorId are no longer valid
+ // because they have just been deleted above!
// TODO(benh): Kill all of the process's descendants? Perhaps
// create a new libprocess process that continually tries to kill
// all the processes that are a descendant of the executor, trying
// to kill the executor last ... maybe this is just too much of a
// burden?
-
- pgids[framework->frameworkId].erase(executor->info.executor_id());
}
}
-void ProcessBasedIsolationModule::resourcesChanged(Framework* framework, Executor* executor)
+void ProcessBasedIsolationModule::resourcesChanged(
+ const FrameworkID& frameworkId,
+ const ExecutorID& executorId,
+ const Resources& resources)
{
// Do nothing; subclasses may override this.
}
-ExecutorLauncher* ProcessBasedIsolationModule::createExecutorLauncher(Framework* framework, Executor* executor)
+ExecutorLauncher* ProcessBasedIsolationModule::createExecutorLauncher(
+ const FrameworkID& frameworkId,
+ const FrameworkInfo& frameworkInfo,
+ const ExecutorInfo& executorInfo,
+ const string& directory)
{
// Create a map of parameters for the executor launcher.
map<string, string> params;
- for (int i = 0; i < executor->info.params().param_size(); i++) {
- params[executor->info.params().param(i).key()] =
- executor->info.params().param(i).value();
- }
-
- return
- new ExecutorLauncher(framework->frameworkId,
- executor->info.executor_id(),
- executor->info.uri(),
- framework->info.user(),
- slave->getUniqueWorkDirectory(framework->frameworkId,
- executor->info.executor_id()),
- slave->self(),
- slave->getConfiguration().get("frameworks_home", ""),
- slave->getConfiguration().get("home", ""),
- slave->getConfiguration().get("hadoop_home", ""),
- !slave->local,
- slave->getConfiguration().get("switch_user", true),
- params);
+ for (int i = 0; i < executorInfo.params().param_size(); i++) {
+ params[executorInfo.params().param(i).key()] =
+ executorInfo.params().param(i).value();
+ }
+
+ return new ExecutorLauncher(frameworkId,
+ executorInfo.executor_id(),
+ executorInfo.uri(),
+ frameworkInfo.user(),
+ directory,
+ slave,
+ conf.get("frameworks_home", ""),
+ conf.get("home", ""),
+ conf.get("hadoop_home", ""),
+ !local,
+ conf.get("switch_user", true),
+ "",
+ params);
}
-ProcessBasedIsolationModule::Reaper::Reaper(ProcessBasedIsolationModule* m)
- : module(m)
-{}
-
-
-void ProcessBasedIsolationModule::Reaper::operator () ()
-{
- link(module->slave->self());
- while (true) {
- receive(1);
- if (name() == process::TIMEOUT) {
- // Check whether any child process has exited.
- pid_t pid;
- int status;
- if ((pid = waitpid((pid_t) -1, &status, WNOHANG)) > 0) {
- foreachpair (const FrameworkID& frameworkId, _, module->pgids) {
- foreachpair (const ExecutorID& executorId, pid_t pgid, module->pgids[frameworkId]) {
- if (pgid == pid) {
- // Kill the process group to clean up the tasks.
- LOG(INFO) << "Sending SIGKILL to gpid " << pgid;
- killpg(pgid, SIGKILL);
- module->pgids[frameworkId][executorId] = -1;
- LOG(INFO) << "Telling slave of lost executor " << executorId
- << " of framework " << frameworkId;
- // TODO(benh): This is broken if/when libprocess is parallel!
- module->slave->executorExited(frameworkId, executorId, status);
- module->pgids[frameworkId].erase(executorId);
- break;
- }
- }
- }
+void ProcessBasedIsolationModule::processExited(pid_t pid, int status)
+{
+ foreachkey (const FrameworkID& frameworkId, pgids) {
+ foreachpair (const ExecutorID& executorId, pid_t pgid, pgids[frameworkId]) {
+ if (pgid == pid) {
+ LOG(INFO) << "Telling slave of lost executor " << executorId
+ << " of framework " << frameworkId;
+
+ dispatch(slave, &Slave::executorExited,
+ frameworkId, executorId, status);
+
+ // Try and cleanup after the executor.
+ killExecutor(frameworkId, executorId);
+ return;
}
- } else if (name() == process::TERMINATE || name() == process::EXITED) {
- return;
}
}
}