You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:24:41 UTC
svn commit: r1131744 - in /incubator/mesos/trunk/src: master.cpp
messages.hpp nexus_sched.cpp slave.cpp
Author: benh
Date: Sun Jun 5 05:24:41 2011
New Revision: 1131744
URL: http://svn.apache.org/viewvc?rev=1131744&view=rev
Log:
Getting code deployable with and without ZooKeeper.
Modified:
incubator/mesos/trunk/src/master.cpp
incubator/mesos/trunk/src/messages.hpp
incubator/mesos/trunk/src/nexus_sched.cpp
incubator/mesos/trunk/src/slave.cpp
Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131744&r1=1131743&r2=1131744&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun 5 05:24:41 2011
@@ -148,18 +148,25 @@ Master::Master(const string& _allocatorT
Master::~Master()
{
- if (isFT && masterDetector != NULL)
- delete masterDetector;
LOG(INFO) << "Shutting down master";
+
+ if (masterDetector != NULL) {
+ delete masterDetector;
+ masterDetector = NULL;
+ }
+
delete allocator;
+
foreachpair (_, Framework *framework, frameworks) {
foreachpair(_, TaskInfo *task, framework->tasks)
delete task;
delete framework;
}
+
foreachpair (_, Slave *slave, slaves) {
delete slave;
}
+
foreachpair (_, SlotOffer *offer, slotOffers) {
delete offer;
}
@@ -281,9 +288,27 @@ void Master::operator () ()
{
LOG(INFO) << "Master started at nexus://" << self();
- LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
- masterDetector = new MasterDetector(zkServers, ZNODE, self(), true);
-
+ if (isFT) {
+ LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
+ masterDetector = new MasterDetector(zkServers, ZNODE, self(), true);
+ } else {
+ send(self(), pack<GOT_MASTER_SEQ>("0"));
+ }
+
+ // Don't do anything until we get a sequence identifier.
+ bool waitingForSeq = true;
+ do {
+ switch (receive()) {
+ case GOT_MASTER_SEQ: {
+ string mySeq;
+ unpack<GOT_MASTER_SEQ>(mySeq);
+ masterId = lexical_cast<long>(mySeq);
+ LOG(INFO) << "Master ID:" << masterId;
+ break;
+ }
+ }
+ } while (waitingForSeq);
+
allocator = createAllocator();
if (!allocator)
LOG(FATAL) << "Unrecognized allocator type: " << allocatorType;
@@ -294,15 +319,6 @@ void Master::operator () ()
while (true) {
switch (receive()) {
- case GOT_MASTER_SEQ: {
- // TODO(benh|alig): NEED TO GET SEQ BEFORE ANYONE ELSE CONNECTS!
- string mySeq;
- unpack<GOT_MASTER_SEQ>(mySeq);
- masterId = lexical_cast<long>(mySeq);
- LOG(INFO) << "Master ID:" << masterId;
- break;
- }
-
case F2M_REGISTER_FRAMEWORK: {
FrameworkID fid = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextFrameworkId++);
Modified: incubator/mesos/trunk/src/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.hpp?rev=1131744&r1=1131743&r2=1131744&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.hpp (original)
+++ incubator/mesos/trunk/src/messages.hpp Sun Jun 5 05:24:41 2011
@@ -15,6 +15,7 @@
#include "foreach.hpp"
#include "task_info.hpp"
+
namespace nexus { namespace internal {
enum MessageType {
Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131744&r1=1131743&r2=1131744&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun 5 05:24:41 2011
@@ -76,9 +76,8 @@ private:
MasterDetector *masterDetector;
FTMessaging *ftMsg;
- typedef unordered_map< SlaveID, PID > SidToPidMap;
- unordered_map< OfferID, SidToPidMap > savedOffers;
- SidToPidMap sidToPidMap;
+ unordered_map<OfferID, unordered_map<SlaveID, PID> > savedOffers;
+ unordered_map<SlaveID, PID> savedSlavePids;
volatile bool terminate;
@@ -114,7 +113,8 @@ public:
terminate(false),
frameworkName(_frameworkName),
execInfo(_execInfo),
- masterDetector(NULL)
+ masterDetector(NULL),
+ ftMsg(FTMessaging::getInstance())
{
pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(_master);
if (urlPair.first == UrlProcessor::ZOO) {
@@ -129,9 +129,15 @@ public:
cerr << "Failed to parse URL for master: " << _master <<endl;
exit(1);
}
- }
+ }
+}
- ftMsg = FTMessaging::getInstance();
+~SchedulerProcess()
+{
+ if (masterDetector != NULL) {
+ delete masterDetector;
+ masterDetector = NULL;
+ }
}
protected:
@@ -143,8 +149,12 @@ protected:
fatal("failed to get username information");
string user(passwd->pw_name);
- LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
- masterDetector = new MasterDetector(zkServers, ZNODE, self(), false);
+ if (isFT) {
+ LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
+ masterDetector = new MasterDetector(zkServers, ZNODE, self(), false);
+ } else {
+ send(self(), pack<NEW_MASTER_DETECTED>(0, master));
+ }
while(true) {
// Rather than send a message to this process when it is time to
@@ -165,7 +175,8 @@ protected:
switch(receive(FT_TIMEOUT)) {
// TODO(benh): We need to break the receive loop every so often
// to check if 'terminate' has been set .. but rather than use a
- // timeout in receive, maybe we should send a message.
+ // timeout in receive, it would be nice to send a message, but
+ // see above.
case NEW_MASTER_DETECTED: {
string masterSeq;
@@ -206,9 +217,10 @@ protected:
vector<SlaveOffer> offs;
unpack<M2F_SLOT_OFFER>(oid, offs);
- SidToPidMap &tmpMap = savedOffers[ oid ];
+ // Save all the slave PIDs found in the offer so later we can
+ // send framework messages directly.
foreach(const SlaveOffer &offer, offs) {
- tmpMap[ offer.slaveId ] = offer.slavePid;
+ savedOffers[oid][offer.slaveId] = offer.slavePid;
}
invoke(bind(&Scheduler::resourceOffer, sched, driver, oid, ref(offs)));
@@ -221,27 +233,29 @@ protected:
Params params;
unpack<F2F_SLOT_OFFER_REPLY>(oid, tasks, params);
- // Save only the PIDs we need for sending framework messages.
+ // Keep only the slave PIDs where we run tasks so we can send
+ // framework messages directly.
foreach(const TaskDescription &task, tasks) {
- sidToPidMap[ task.slaveId ] = savedOffers[ oid ][ task.slaveId ];
-
+ savedSlavePids[task.slaveId] = savedOffers[oid][task.slaveId];
}
- savedOffers.erase( oid );
- // TODO(alig|benh): Use new API -> rsend(master, pack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params));
- // TODO(alig): Improve the following comment.
- // Do a reliable send here because ...
+ // Remove the offer since we saved all the PIDs we might use.
+ savedOffers.erase(oid);
+
+ // TODO(alig|benh): Walk through scenario if the master dies
+ // after it sends out M2S_RUN_TASK messages?
+
if (isFT) {
- TimeoutListener *tListener =
- new TimeoutListener(this, tasks);
+ TimeoutListener *tListener = new TimeoutListener(this, tasks);
string ftId = ftMsg->getNextId();
DLOG(INFO) << "Sending reliably reply to slot offer for msg " << ftId;
- ftMsg->reliableSend( ftId,
- pack<F2M_FT_SLOT_OFFER_REPLY>(ftId, self(), fid, oid, tasks, params),
- tListener);
- } else
+ ftMsg->reliableSend(ftId,
+ pack<F2M_FT_SLOT_OFFER_REPLY>(ftId, self(), fid, oid, tasks, params),
+ tListener);
+ } else {
send(master, pack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params));
+ }
break;
}
@@ -249,15 +263,12 @@ protected:
case F2F_FRAMEWORK_MESSAGE: {
FrameworkMessage msg;
unpack<F2F_FRAMEWORK_MESSAGE>(msg);
-
- /*
- if (isFT) {
- string ftId = ftMsg->getNextId();
- ftMsg->reliableSend( ftId, pack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), fid, msg));
- } else
- send(master, pack<F2M_FRAMEWORK_MESSAGE>(fid, msg));
- */
- send( sidToPidMap[ msg.slaveId ], pack<M2S_FRAMEWORK_MESSAGE>(fid, msg));
+// if (isFT) {
+// string ftId = ftMsg->getNextId();
+// ftMsg->reliableSend( ftId, pack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), fid, msg));
+// } else
+// send(master, pack<F2M_FRAMEWORK_MESSAGE>(fid, msg));
+ send(savedSlavePids[msg.slaveId], pack<M2S_FRAMEWORK_MESSAGE>(fid, msg));
break;
}
@@ -284,7 +295,6 @@ protected:
break;
}
-
case M2F_STATUS_UPDATE: {
TaskID tid;
TaskState state;
@@ -319,7 +329,7 @@ protected:
case M2F_LOST_SLAVE: {
SlaveID sid;
unpack<M2F_LOST_SLAVE>(sid);
- sidToPidMap.erase(sid);
+ savedSlavePids.erase(sid);
invoke(bind(&Scheduler::slaveLost, sched, driver, sid));
break;
}
@@ -333,14 +343,12 @@ protected:
}
case PROCESS_EXIT: {
- const char* message = "Connection to master failed";
- if (isFT)
- LOG(WARNING) << "Connection to master failed. Waiting for a new master to be elected.";
- else
- {
- LOG(ERROR) << "Connection to master failed. Exiting. Consider running Nexus in FT mode!";
- invoke(bind(&Scheduler::error, sched, driver, -1, message));
- }
+ if (isFT) {
+ LOG(WARNING) << "Connection to master lost .. waiting for new master.";
+ } else {
+ const char* message = "Connection to master failed";
+ invoke(bind(&Scheduler::error, sched, driver, -1, message));
+ }
break;
}
Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131744&r1=1131743&r2=1131744&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun 5 05:24:41 2011
@@ -107,7 +107,10 @@ Slave::Slave(const string &_master, Reso
Slave::~Slave()
{
- delete masterDetector;
+ if (masterDetector != NULL) {
+ delete masterDetector;
+ masterDetector = NULL;
+ }
}
@@ -141,8 +144,12 @@ void Slave::operator () ()
{
LOG(INFO) << "Slave started at " << self();
- LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
- masterDetector = new MasterDetector(zkServers, ZNODE, self(), false);
+ if (isFT) {
+ LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
+ masterDetector = new MasterDetector(zkServers, ZNODE, self(), false);
+ } else {
+ send(self(), pack<NEW_MASTER_DETECTED>(0, master));
+ }
// Get our hostname
char buf[256];
@@ -365,13 +372,11 @@ void Slave::operator () ()
unpack<E2S_FRAMEWORK_MESSAGE>(fid, message);
// Set slave ID in case framework omitted it
message.slaveId = this->id;
- /*
- if (isFT) {
- string ftId = ftMsg->getNextId();
- ftMsg->reliableSend(ftId, pack<S2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), id, fid, message));
- } else
- send(master, pack<S2M_FRAMEWORK_MESSAGE>(id, fid, message));
- */
+// if (isFT) {
+// string ftId = ftMsg->getNextId();
+// ftMsg->reliableSend(ftId, pack<S2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), id, fid, message));
+// } else
+// send(master, pack<S2M_FRAMEWORK_MESSAGE>(id, fid, message));
send(getFramework(fid)->fwPid, pack<M2F_FRAMEWORK_MESSAGE>(message));
break;
}
@@ -386,16 +391,15 @@ void Slave::operator () ()
if (from() == master) {
// TODO: Fault tolerance!
- if (isFT)
- LOG(WARNING) << "FT: Master disconnected! Waiting for a new master to be elected.";
- else
- {
- LOG(ERROR) << "Master disconnected! Exiting. Consider running Nexus in FT mode!";
- if (isolationModule != NULL)
- delete isolationModule;
- // TODO: Shut down executors?
- return;
- }
+ if (isFT) {
+ LOG(WARNING) << "FT: Master disconnected! Waiting for a new master to be elected.";
+ } else {
+ LOG(ERROR) << "Master disconnected! Exiting. Consider running Nexus in FT mode!";
+ if (isolationModule != NULL)
+ delete isolationModule;
+ // TODO: Shut down executors?
+ return;
+ }
}
foreachpair (_, Executor *ex, executors) {