You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:34:19 UTC
svn commit: r1131794 - in /incubator/mesos/trunk: include/ src/ src/tests/
src/third_party/libprocess/
Author: benh
Date: Sun Jun 5 05:34:18 2011
New Revision: 1131794
URL: http://svn.apache.org/viewvc?rev=1131794&view=rev
Log:
Factoring out MasterDetector and basic implementation of scheduler failover.
Modified:
incubator/mesos/trunk/include/nexus_sched.hpp
incubator/mesos/trunk/src/Makefile.in
incubator/mesos/trunk/src/local.cpp
incubator/mesos/trunk/src/master.cpp
incubator/mesos/trunk/src/master.hpp
incubator/mesos/trunk/src/master_detector.cpp
incubator/mesos/trunk/src/master_detector.hpp
incubator/mesos/trunk/src/master_main.cpp
incubator/mesos/trunk/src/messages.hpp
incubator/mesos/trunk/src/nexus_local.cpp
incubator/mesos/trunk/src/nexus_local.hpp
incubator/mesos/trunk/src/nexus_sched.cpp
incubator/mesos/trunk/src/process_based_isolation_module.cpp
incubator/mesos/trunk/src/slave.cpp
incubator/mesos/trunk/src/slave.hpp
incubator/mesos/trunk/src/slave_main.cpp
incubator/mesos/trunk/src/tests/test_master.cpp
incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp
incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp
incubator/mesos/trunk/src/third_party/libprocess/serialization.cpp
incubator/mesos/trunk/src/third_party/libprocess/serialization.hpp
incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp
incubator/mesos/trunk/src/url_processor.cpp
incubator/mesos/trunk/src/zookeeper.hpp
Modified: incubator/mesos/trunk/include/nexus_sched.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/nexus_sched.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/include/nexus_sched.hpp (original)
+++ incubator/mesos/trunk/include/nexus_sched.hpp Sun Jun 5 05:34:18 2011
@@ -11,7 +11,7 @@ namespace nexus {
class SchedulerDriver;
-namespace internal { class SchedulerProcess; }
+namespace internal { class SchedulerProcess; class MasterDetector; }
/**
@@ -77,7 +77,10 @@ public:
class NexusSchedulerDriver : public SchedulerDriver
{
public:
- NexusSchedulerDriver(Scheduler* sched, const std::string& master);
+ NexusSchedulerDriver(Scheduler* sched,
+ const std::string& url,
+ FrameworkID fid = "");
+
virtual ~NexusSchedulerDriver();
// Lifecycle methods
@@ -102,12 +105,16 @@ private:
// Internal utility method to report an error to the scheduler
void error(int code, const std::string& message);
- std::string master;
Scheduler* sched;
+ std::string url;
+ FrameworkID fid;
// LibProcess process for communicating with master
internal::SchedulerProcess* process;
+ // Coordination between masters
+ internal::MasterDetector* detector;
+
// Are we currently registered with the master
bool running;
Modified: incubator/mesos/trunk/src/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.in?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.in (original)
+++ incubator/mesos/trunk/src/Makefile.in Sun Jun 5 05:34:18 2011
@@ -53,11 +53,9 @@ LDFLAGS += -Lthird_party/libprocess
# Add libev to LDFLAGS.
LDFLAGS += -L$(LIBEV)/.libs
-# Add glog and gtest to include and lib paths
-CXXFLAGS += -I$(GLOG)/src
-CXXFLAGS += -I$(GTEST)/include
-LDFLAGS += -L$(GLOG)/.libs
-LDFLAGS += -L$(GTEST)/lib/.libs
+# Add glog and gtest to include and lib paths.
+CXXFLAGS += -I$(GLOG)/src -I$(GTEST)/include
+LDFLAGS += -L$(GLOG)/.libs -L$(GTEST)/lib/.libs
# Add local ZooKeeper to include and lib paths if necessary.
ifeq ($(LOCAL_ZOOKEEPER),yes)
@@ -102,7 +100,7 @@ NEXUS_LIBS = $(SCHED_LIB) $(EXEC_LIB) $(
MASTER_OBJ = master.o allocator_factory.o simple_allocator.o
SLAVE_OBJ = slave.o launcher.o isolation_module_factory.o \
process_based_isolation_module.o
-COMMON_OBJ = fatal.o hash_pid.o messages.o lock.o master_detector.o url_processor.o ft_messaging.o zookeeper.o
+COMMON_OBJ = fatal.o hash_pid.o messages.o lock.o master_detector.o url_processor.o zookeeper.o
EXEC_LIB_OBJ = nexus_exec.o
SCHED_LIB_OBJ = nexus_sched.o nexus_local.o params.o
TEST_OBJ = tests/main.o tests/test_master.o tests/test_resources.o
Modified: incubator/mesos/trunk/src/local.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/local.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/local.cpp (original)
+++ incubator/mesos/trunk/src/local.cpp Sun Jun 5 05:34:18 2011
@@ -9,6 +9,8 @@ using std::cerr;
using std::endl;
using std::string;
+using namespace nexus::internal;
+
int main (int argc, char **argv)
{
@@ -60,7 +62,7 @@ int main (int argc, char **argv)
}
}
- const PID &master = run_nexus(slaves, cpus, mem, true, quiet);
+ const PID &master = local::launch(slaves, cpus, mem, true, quiet);
Process::wait(master);
Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun 5 05:34:18 2011
@@ -109,49 +109,15 @@ public:
}
-Master::Master(const string &zk)
- : isFT(false), masterDetector(NULL), nextFrameworkId(0), nextSlaveId(0),
- nextSlotOfferId(0), allocatorType("simple"), masterId(0)
-{
- if (zk != "") {
- pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(zk);
- if (urlPair.first == UrlProcessor::ZOO) {
- isFT = true;
- zkServers = urlPair.second;
- } else {
- LOG(ERROR) << "Failed to parse URL for ZooKeeper servers. URL must start with zoo:// or zoofile://";
- exit(1);
- }
- }
-}
-
-
-Master::Master(const string& _allocatorType, const string &zk)
- : isFT(false), masterDetector(NULL), nextFrameworkId(0), nextSlaveId(0),
- nextSlotOfferId(0), allocatorType(_allocatorType), masterId(0)
-{
- if (zk != "") {
- pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(zk);
- if (urlPair.first == UrlProcessor::ZOO) {
- isFT = true;
- zkServers = urlPair.second;
- } else {
- LOG(ERROR) << "Failed to parse URL for ZooKeeper servers. URL must start with zoo:// or zoofile://";
- exit(1);
- }
- }
-}
+Master::Master(const string& _allocatorType)
+ : nextFrameworkId(0), nextSlaveId(0), nextSlotOfferId(0),
+ allocatorType(_allocatorType), masterId(0) {}
Master::~Master()
{
LOG(INFO) << "Shutting down master";
- if (masterDetector != NULL) {
- delete masterDetector;
- masterDetector = NULL;
- }
-
delete allocator;
foreachpair (_, Framework *framework, frameworks) {
@@ -175,7 +141,7 @@ state::MasterState * Master::getState()
std::ostringstream oss;
oss << self();
state::MasterState *state =
- new state::MasterState(BUILD_DATE, BUILD_USER, oss.str(), isFT);
+ new state::MasterState(BUILD_DATE, BUILD_USER, oss.str());
foreachpair (_, Slave *s, slaves) {
state::Slave *slave = new state::Slave(s->id, s->hostname, s->publicDns,
@@ -285,28 +251,22 @@ void Master::operator () ()
{
LOG(INFO) << "Master started at nexus://" << self();
- if (isFT) {
- LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
- masterDetector = new MasterDetector(zkServers, ZNODE, self(), true);
- } else {
- send(self(), pack<GOT_MASTER_SEQ>("0"));
- }
-
- // Don't do anything until we get a sequence identifier.
- bool waitingForSeq = true;
- do {
- switch (receive()) {
- case GOT_MASTER_SEQ: {
- string mySeq;
- unpack<GOT_MASTER_SEQ>(mySeq);
- masterId = lexical_cast<long>(mySeq);
- LOG(INFO) << "Master ID:" << masterId;
- waitingForSeq = false;
- break;
- }
+ // Don't do anything until we get an identifier.
+ while (true) {
+ if (receive() == GOT_MASTER_ID) {
+ string id;
+ unpack<GOT_MASTER_ID>(id);
+ masterId = lexical_cast<long>(id);
+ LOG(INFO) << "Master ID:" << masterId;
+ break;
+ } else {
+ LOG(INFO) << "Oops! We're dropping a message since "
+ << "we haven't received an identifier yet!";
}
- } while (waitingForSeq);
+ }
+ // Create the allocator (we do this after the constructor because it
+ // leaks 'this').
allocator = createAllocator();
if (!allocator)
LOG(FATAL) << "Unrecognized allocator type: " << allocatorType;
@@ -321,18 +281,18 @@ void Master::operator () ()
// TODO(benh): We might have been the master, but then got
// partitioned, and now we are finding out once we reconnect
// that we are no longer the master, so we should just die.
- LOG(INFO) << "new master detected ... maybe it's us!";
+ LOG(INFO) << "New master detected ... maybe it's us!";
break;
}
case NO_MASTER_DETECTED: {
- LOG(INFO) << "no master detected ... maybe we're next!";
+ LOG(INFO) << "No master detected ... maybe we're next!";
break;
}
case F2M_REGISTER_FRAMEWORK: {
- FrameworkID fid = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextFrameworkId++);
-
+ FrameworkID fid = lexical_cast<string>(masterId) + "-"
+ + lexical_cast<string>(nextFrameworkId++);
Framework *framework = new Framework(from(), fid);
unpack<F2M_REGISTER_FRAMEWORK>(framework->name,
framework->user,
@@ -349,19 +309,32 @@ void Master::operator () ()
}
case F2M_REREGISTER_FRAMEWORK: {
-
Framework *framework = new Framework(from());
+ bool failover;
unpack<F2M_REREGISTER_FRAMEWORK>(framework->id,
framework->name,
framework->user,
- framework->executorInfo);
+ framework->executorInfo,
+ failover);
if (framework->id == "") {
- DLOG(INFO) << "Framework reconnecting without a FrameworkID, generating new id";
- framework->id = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextFrameworkId++);
+ LOG(ERROR) << "Framework reconnect/failover without an id!";
+ send(framework->pid, pack<M2F_ERROR>(1, "Missing framework id"));
+ break;
+ }
+
+ LOG(INFO) << "Reregistering " << framework << " at " << framework->pid;
+
+ if (frameworks[framework->id] != NULL) {
+ if (failover) {
+ terminateFramework(frameworks[framework->id], 1, "Failover");
+ } else {
+ LOG(INFO) << "Framework reregistering with an already used id!";
+ send(framework->pid, pack<M2F_ERROR>(1, "Framework id in use"));
+ break;
+ }
}
- LOG(INFO) << "Registering " << framework << " at " << framework->pid;
frameworks[framework->id] = framework;
pidToFid[framework->pid] = framework->id;
@@ -372,12 +345,6 @@ void Master::operator () ()
allocator->frameworkAdded(framework);
if (framework->executorInfo.uri == "")
terminateFramework(framework, 1, "No executor URI given");
-
- timeval tv;
- gettimeofday(&tv, NULL);
-
- DLOG(INFO) << tv.tv_sec << "." << tv.tv_usec << " STAT: Slave count: " << slaves.size() << " Framework count: " << frameworks.size();
-
break;
}
@@ -473,8 +440,11 @@ void Master::operator () ()
slave->addTask(tip);
updateFrameworkTasks(tip);
}
+
+ // TODO(benh|alig): We should put a timeout on how long we keep
+ // tasks running that never have frameworks reregister that
+ // claim them.
- //alibandali
LOG(INFO) << "Re-registering " << slave << " at " << slave->pid;
slaves[slave->id] = slave;
pidToSid[slave->pid] = slave->id;
@@ -511,15 +481,13 @@ void Master::operator () ()
DLOG(INFO) << "FT: prepare relay seq:"<< seq() << " from: "<< from();
if (Slave *slave = lookupSlave(sid)) {
if (Framework *framework = lookupFramework(fid)) {
- // Pass on the status update to the framework
-
+ // Pass on the status update to the framework.
forward(framework->pid);
-
if (duplicate()) {
LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << seq();
break;
- }
- // Update the task state locally
+ }
+ // Update the task state locally.
Task *task = slave->lookupTask(fid, tid);
if (task != NULL) {
LOG(INFO) << "Status update: " << task << " is in state " << state;
Modified: incubator/mesos/trunk/src/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.hpp (original)
+++ incubator/mesos/trunk/src/master.hpp Sun Jun 5 05:34:18 2011
@@ -29,7 +29,7 @@
#include "resources.hpp"
#include "master_detector.hpp"
#include "task.hpp"
-#include "url_processor.hpp"
+
namespace nexus { namespace internal { namespace master {
@@ -263,9 +263,6 @@ enum TaskRemovalReason
class Master : public Tuple<ReliableProcess>
{
protected:
- bool isFT;
- string zkServers;
- MasterDetector *masterDetector;
unordered_map<FrameworkID, Framework *> frameworks;
unordered_map<SlaveID, Slave *> slaves;
unordered_map<OfferID, SlotOffer *> slotOffers;
@@ -277,16 +274,13 @@ protected:
long nextSlaveId; // Used to give each slave a unique ID.
long nextSlotOfferId; // Used to give each slot offer a unique ID.
-
string allocatorType;
Allocator *allocator;
long masterId; // Used to differentiate different master in FT mode, will be ephemeral id
public:
- Master(const string &zk = "");
-
- Master(const string& _allocatorType, const string &zk = "");
+ Master(const string& _allocatorType = "simple");
~Master();
Modified: incubator/mesos/trunk/src/master_detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master_detector.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master_detector.cpp (original)
+++ incubator/mesos/trunk/src/master_detector.cpp Sun Jun 5 05:34:18 2011
@@ -11,34 +11,277 @@
#include <boost/lexical_cast.hpp>
+#include "config.hpp"
#include "fatal.hpp"
+#include "foreach.hpp"
#include "master_detector.hpp"
#include "messages.hpp"
+#include "url_processor.hpp"
+#include "zookeeper.hpp"
using namespace nexus;
using namespace nexus::internal;
+using namespace std;
+
using boost::lexical_cast;
-MasterDetector::MasterDetector(const string &_servers, const string &_znode,
- const PID &_pid, bool _contend)
+class ZooKeeperMasterDetector : public MasterDetector, public Watcher
+{
+public:
+ /**
+ * Uses ZooKeeper for both detecting masters and contending to be a
+ * master.
+ *
+ * @param server comma separated list of server host:port pairs
+ *
+ * @param znode top-level "ZooKeeper node" (directory) to use
+ * @param pid libprocess pid to send messages/updates to (and to
+ * use for contending to be a master)
+ * @param contend true if should contend to be master (not needed
+ * for slaves and frameworks)
+ * @param quiet verbosity logging level for undelying ZooKeeper library
+ */
+ ZooKeeperMasterDetector(const std::string &servers,
+ const std::string &znode,
+ const PID &pid,
+ bool contend = false,
+ bool quiet = false);
+
+ virtual ~ZooKeeperMasterDetector();
+
+ /**
+ * ZooKeeper watcher callback.
+ */
+ virtual void process(ZooKeeper *zk, int type, int state,
+ const std::string &path);
+
+ /**
+ * @return unique id of the current master
+ */
+ virtual std::string getCurrentMasterId();
+
+ /**
+ * @return libprocess PID of the current master
+ */
+ virtual PID getCurrentMasterPID();
+
+private:
+ /**
+ * TODO(alig): Comment this object.
+ */
+ void setId(const std::string &s);
+
+ /**
+ * TODO(alig): Comment this object.
+ */
+ std::string getId();
+
+ /**
+ * TODO(alig): Comment this object.
+ */
+ void detectMaster();
+
+ /**
+ * TODO(alig): Comment this object.
+ */
+ PID lookupMasterPID(const std::string &seq) const;
+
+ std::string servers;
+ std::string znode;
+ PID pid;
+ bool contend;
+ bool reconnect;
+
+ ZooKeeper *zk;
+
+ // Our sequence string if contending to be a master.
+ std::string mySeq;
+
+ std::string currentMasterSeq;
+ PID currentMasterPID;
+};
+
+
+
+MasterDetector::~MasterDetector() {}
+
+
+MasterDetector * MasterDetector::create(const std::string &url,
+ const PID &pid,
+ bool contend,
+ bool quiet)
+{
+ if (url == "")
+ if (contend)
+ return new BasicMasterDetector(pid);
+ else
+ fatal("cannot use specified url to detect master");
+
+ MasterDetector *detector = NULL;
+
+ // Parse the url.
+ pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(url);
+
+ switch (urlPair.first) {
+ // ZooKeeper URL.
+ case UrlProcessor::ZOO: {
+ const string &servers = urlPair.second;
+ detector = new ZooKeeperMasterDetector(servers, ZNODE, pid, contend, quiet);
+ break;
+ }
+
+ // Nexus URL or libprocess pid.
+ case UrlProcessor::NEXUS:
+ case UrlProcessor::UNKNOWN: {
+ if (contend) {
+ // TODO(benh): Wierdnesses like this makes it seem like there
+ // should be a separate elector and detector. In particular,
+ // it doesn't make sense to pass a libprocess pid and attempt
+ // to contend (at least not right now).
+ fatal("cannot contend to be a master with specified url");
+ } else {
+ PID master = make_pid(urlPair.second.c_str());
+ if (!master)
+ fatal("cannot use specified url to detect master");
+ detector = new BasicMasterDetector(master, pid);
+ }
+ break;
+ }
+ }
+
+ return detector;
+}
+
+
+void MasterDetector::destroy(MasterDetector *detector)
+{
+ if (detector != NULL)
+ delete detector;
+}
+
+
+BasicMasterDetector::BasicMasterDetector(const PID &_master)
+ : master(_master)
+{
+ // Send a master id.
+ {
+ const string &s =
+ Tuple<Process>::tupleToString(Tuple<Process>::pack<GOT_MASTER_ID>("0"));
+ Process::post(master, GOT_MASTER_ID, s.data(), s.size());
+ }
+
+ // Elect the master.
+ {
+ const string &s =
+ Tuple<Process>::tupleToString(Tuple<Process>::pack<NEW_MASTER_DETECTED>("0", master));
+ Process::post(master, NEW_MASTER_DETECTED, s.data(), s.size());
+ }
+}
+
+
+BasicMasterDetector::BasicMasterDetector(const PID &_master,
+ const PID &pid,
+ bool elect)
+ : master(_master)
+{
+ if (elect) {
+ // Send a master id.
+ {
+ const string &s =
+ Tuple<Process>::tupleToString(Tuple<Process>::pack<GOT_MASTER_ID>("0"));
+ Process::post(master, GOT_MASTER_ID, s.data(), s.size());
+ }
+
+ // Elect the master.
+ {
+ const string &s =
+ Tuple<Process>::tupleToString(Tuple<Process>::pack<NEW_MASTER_DETECTED>("0", master));
+ Process::post(master, NEW_MASTER_DETECTED, s.data(), s.size());
+ }
+ }
+
+ // Tell the pid about the master.
+ const string &s =
+ Tuple<Process>::tupleToString(Tuple<Process>::pack<NEW_MASTER_DETECTED>("0", master));
+ Process::post(pid, NEW_MASTER_DETECTED, s.data(), s.size());
+}
+
+
+BasicMasterDetector::BasicMasterDetector(const PID &_master,
+ const vector<PID> &pids,
+ bool elect)
+ : master(_master)
+{
+ if (elect) {
+ // Send a master id.
+ {
+ const string &s =
+ Tuple<Process>::tupleToString(Tuple<Process>::pack<GOT_MASTER_ID>("0"));
+ Process::post(master, GOT_MASTER_ID, s.data(), s.size());
+ }
+
+ // Elect the master.
+ {
+ const string &s =
+ Tuple<Process>::tupleToString(Tuple<Process>::pack<NEW_MASTER_DETECTED>("0", master));
+ Process::post(master, NEW_MASTER_DETECTED, s.data(), s.size());
+ }
+ }
+
+ // Tell each pid about the master.
+ foreach (const PID &pid, pids) {
+ const string &s =
+ Tuple<Process>::tupleToString(Tuple<Process>::pack<NEW_MASTER_DETECTED>("0", master));
+ Process::post(pid, NEW_MASTER_DETECTED, s.data(), s.size());
+ }
+}
+
+
+BasicMasterDetector::~BasicMasterDetector() {}
+
+
+string BasicMasterDetector::getCurrentMasterId()
+{
+ return "0";
+}
+
+
+PID BasicMasterDetector::getCurrentMasterPID()
+{
+ return master;
+}
+
+
+ZooKeeperMasterDetector::ZooKeeperMasterDetector(const string &_servers,
+ const string &_znode,
+ const PID &_pid,
+ bool _contend,
+ bool quiet)
: servers(_servers), znode(_znode), pid(_pid),
contend(_contend), reconnect(false)
{
+ // Set verbosity level for underlying ZooKeeper library logging.
+ // TODO(benh): Put this in the C++ API.
+ zoo_set_debug_level(quiet ? ZOO_LOG_LEVEL_ERROR : ZOO_LOG_LEVEL_DEBUG);
+
+ // Start up the ZooKeeper connection!
zk = new ZooKeeper(servers, 10000, this);
}
-MasterDetector::~MasterDetector()
+ZooKeeperMasterDetector::~ZooKeeperMasterDetector()
{
- if (zk != NULL)
+ if (zk != NULL) {
delete zk;
+ zk = NULL;
+ }
}
-void MasterDetector::process(ZooKeeper *zk, int type, int state,
- const string &path)
+void ZooKeeperMasterDetector::process(ZooKeeper *zk, int type, int state,
+ const string &path)
{
int ret;
string result;
@@ -85,12 +328,12 @@ void MasterDetector::process(ZooKeeper *
"Make sure ZooKeeper is running on: %s",
zk->error(ret), servers.c_str());
- setMySeq(result);
- LOG(INFO) << "Created ephemeral/sequence:" << getMySeq();
+ setId(result);
+ LOG(INFO) << "Created ephemeral/sequence:" << getId();
const string &s =
- Tuple<Process>::tupleToString(Tuple<Process>::pack<GOT_MASTER_SEQ>(getMySeq()));
- Process::post(pid, GOT_MASTER_SEQ, s.data(), s.size());
+ Tuple<Process>::tupleToString(Tuple<Process>::pack<GOT_MASTER_ID>(getId()));
+ Process::post(pid, GOT_MASTER_ID, s.data(), s.size());
}
// Now determine who the master is (it may be us).
@@ -139,7 +382,25 @@ void MasterDetector::process(ZooKeeper *
}
-void MasterDetector::detectMaster()
+void ZooKeeperMasterDetector::setId(const string &s)
+{
+ string seq = s;
+ // Converts "/path/to/znode/000000131" to "000000131".
+ int pos;
+ if ((pos = seq.find_last_of('/')) != string::npos) {
+ mySeq = seq.erase(0, pos + 1);
+ } else
+ mySeq = "";
+}
+
+
+string ZooKeeperMasterDetector::getId()
+{
+ return mySeq;
+}
+
+
+void ZooKeeperMasterDetector::detectMaster()
{
vector<string> results;
@@ -184,7 +445,7 @@ void MasterDetector::detectMaster()
}
-PID MasterDetector::lookupMasterPID(const string &seq) const
+PID ZooKeeperMasterDetector::lookupMasterPID(const string &seq) const
{
CHECK(!seq.empty());
@@ -203,11 +464,13 @@ PID MasterDetector::lookupMasterPID(cons
}
-string MasterDetector::getCurrentMasterSeq() const {
+string ZooKeeperMasterDetector::getCurrentMasterId()
+{
return currentMasterSeq;
}
-PID MasterDetector::getCurrentMasterPID() const {
+PID ZooKeeperMasterDetector::getCurrentMasterPID()
+{
return currentMasterPID;
}
Modified: incubator/mesos/trunk/src/master_detector.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master_detector.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master_detector.hpp (original)
+++ incubator/mesos/trunk/src/master_detector.hpp Sun Jun 5 05:34:18 2011
@@ -7,100 +7,100 @@
#include <climits>
#include <cstdlib>
-#include "zookeeper.hpp"
-
-using namespace std;
+namespace nexus { namespace internal {
/**
- * Implements functionality for a) detecting masters b) contending to be a master.
+ * Implements functionality for:
+ * a) detecting masters
+ * b) contending to be a master
*/
-class MasterDetector : public Watcher {
+class MasterDetector
+{
+public:
+ virtual ~MasterDetector() = 0;
+
+ /**
+ * Creates a master detector that, given the specified url, knows
+ * how to connect to the master, or contend to be a master. The
+ * master detector sends messages to the specified pid when a new
+ * master is elected, a master is lost, etc.
+ *
+ * @param url string possibly containing zoo://, zoofile://, nexus://
+ * @param pid libprocess pid to both receive our messages and be
+ * used if we should contend
+ * @param contend true if should contend to be master
+ * @return instance of MasterDetector
+ */
+ static MasterDetector * create(const std::string &url,
+ const PID &pid,
+ bool contend = false,
+ bool quiet = true);
+
+ /**
+ * Cleans up and deallocates the detector.
+ */
+ static void destroy(MasterDetector *detector);
+
+ /**
+ * @return unique id of the current master
+ */
+ virtual std::string getCurrentMasterId() = 0;
+
+ /**
+ * @return libprocess PID of the current master
+ */
+ virtual PID getCurrentMasterPID() = 0;
+};
+
+
+class BasicMasterDetector : public MasterDetector
+{
public:
- /**
- * Connects to ZooKeeper and possibly contends for master.
- *
- * @param server comma separated list of zookeeper server host:port
- * pairs
- * @param znode ZooKeeper node (directory)
- * @param pid libprocess pid that we should send messages to (and to
- * contend)
- * @param contend true if should contend to be master (not needed
- * for slaves and frameworks)
- */
- MasterDetector(const string &server, const string &znode,
- const PID &_pid, bool contend = false);
-
- ~MasterDetector();
-
- /**
- * ZooKeeper watcher.
- */
- virtual void process(ZooKeeper *zk, int type, int state, const string &path);
-
- /**
- * @return ZooKeeper unique sequence number of the current master.
- */
- string getCurrentMasterSeq() const;
-
- /**
- * @return libprocess PID of the current master.
- */
- PID getCurrentMasterPID() const;
-
- /**
- * @return Unique ZooKeeper sequence number (only if contending for master, otherwise "").
- */
- string getMySeq() const
- {
- return mySeq;
- }
-
- /**
- * Adjusts ZooKeepers level of debugging output.
- * @param quiet true makes ZK quiet, whereas false makes ZK output DEBUG messages
- */
- static void setQuiet(bool quiet)
- {
- zoo_set_debug_level(quiet ? ZOO_LOG_LEVEL_ERROR : ZOO_LOG_LEVEL_DEBUG);
- }
-
-private:
- void setMySeq(const string &s)
- {
- string seq = s;
- // Converts "/path/to/znode/000000131" to "000000131".
- int pos;
- if ((pos = seq.find_last_of('/')) != string::npos) {
- mySeq = seq.erase(0, pos + 1);
- } else
- mySeq = "";
- }
-
- /*
- * TODO(alig): Comment this object.
- */
- void detectMaster();
-
- /*
- * TODO(alig): Comment this object.
- */
- PID lookupMasterPID(const string &seq) const;
-
- string servers;
- string znode;
- PID pid;
- bool contend;
- bool reconnect;
+ /**
+ * Create a new master detector where the specified pid contends to
+ * be the master and gets elected by default.
+ *
+ * @param master libprocess pid to send messages/updates and be the
+ * master
+ */
+ BasicMasterDetector(const PID &master);
+
+ /**
+ * Create a new master detector where the 'master' pid is
+ * the master (no contending).
+ *
+ * @param master libprocess pid to send messages/updates and be the
+ * master
+ * @param pid/pids libprocess pids to send messages/updates to regarding
+ * the master
+ * @param elect if true then contend and elect the specified master
+ */
+ BasicMasterDetector(const PID &master,
+ const PID &pid,
+ bool elect = false);
+
+ BasicMasterDetector(const PID &master,
+ const std::vector<PID> &pids,
+ bool elect = false);
- ZooKeeper *zk;
+ virtual ~BasicMasterDetector();
- // Our sequence number if contending.
- string mySeq;
+ /**
+ * @return unique id of the current master
+ */
+ virtual std::string getCurrentMasterId();
- string currentMasterSeq;
- PID currentMasterPID;
+ /**
+ * @return libprocess PID of the current master
+ */
+ virtual PID getCurrentMasterPID();
+
+private:
+ PID master;
};
+}} /* namespace nexus { namespace internal { */
+
#endif /* __MASTER_DETECTOR_HPP__ */
Modified: incubator/mesos/trunk/src/master_main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master_main.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master_main.cpp (original)
+++ incubator/mesos/trunk/src/master_main.cpp Sun Jun 5 05:34:18 2011
@@ -3,7 +3,6 @@
#include "master.hpp"
#include "master_webui.hpp"
-#include "url_processor.hpp"
using std::cerr;
using std::endl;
@@ -14,14 +13,14 @@ using namespace nexus::internal::master;
void usage(const char* programName)
{
cerr << "Usage: " << programName
+ << " [--url URL]"
<< " [--port PORT]"
<< " [--allocator ALLOCATOR]"
- << " [--zookeeper ZOO_SERVERS]"
<< " [--quiet]" << endl
<< endl
- << "ZOO_SERVERS is a url of the form: "
- << "zoo://host1:port1,host2:port2,... or "
- << "zoofile://file where file contains a host:port pair per line"
+ << "URL (used for contending to be a master) may be one of:" << endl
+ << " zoo://host1:port1,host2:port2,..." << endl
+ << " zoofile://file where file contains a host:port pair per line"
<< endl;
}
@@ -34,31 +33,30 @@ int main (int argc, char **argv)
}
option options[] = {
+ {"url", required_argument, 0, 'u'},
{"allocator", required_argument, 0, 'a'},
{"port", required_argument, 0, 'p'},
- {"zookeeper", required_argument, 0, 'z'},
{"quiet", no_argument, 0, 'q'},
};
- bool isFT = false;
- string zookeeper = "";
- bool quiet = false;
+ string url = "";
string allocator = "simple";
+ string znode = ZNODE;
+ bool quiet = false;
int opt;
int index;
- while ((opt = getopt_long(argc, argv, "a:p:z:q", options, &index)) != -1) {
+ while ((opt = getopt_long(argc, argv, "u:a:p:q", options, &index)) != -1) {
switch (opt) {
+ case 'u':
+ url = optarg;
+ break;
case 'a':
allocator = optarg;
break;
case 'p':
setenv("LIBPROCESS_PORT", optarg, 1);
break;
- case 'z':
- isFT = true;
- zookeeper = optarg;
- break;
case 'q':
quiet = true;
break;
@@ -71,22 +69,22 @@ int main (int argc, char **argv)
}
}
- if (!quiet)
- google::SetStderrLogging(google::INFO);
- else if (isFT)
- MasterDetector::setQuiet(true);
-
+ // TODO(benh): Don't log to /tmp behind a sys admin's back!
FLAGS_log_dir = "/tmp";
FLAGS_logbufsecs = 1;
google::InitGoogleLogging(argv[0]);
+ if (!quiet)
+ google::SetStderrLogging(google::INFO);
+
LOG(INFO) << "Build: " << BUILD_DATE << " by " << BUILD_USER;
LOG(INFO) << "Starting Nexus master";
- if (isFT)
- LOG(INFO) << "Nexus in fault-tolerant mode";
- Master *master = new Master(allocator, zookeeper);
+
+ Master *master = new Master(allocator);
PID pid = Process::spawn(master);
+ MasterDetector *detector = MasterDetector::create(url, pid, true, quiet);
+
#ifdef NEXUS_WEBUI
if (chdir(dirname(argv[0])) != 0)
fatalerror("could not change into %s for running webui", dirname(argv[0]));
@@ -94,5 +92,8 @@ int main (int argc, char **argv)
#endif
Process::wait(pid);
+
+ MasterDetector::destroy(detector);
+
return 0;
}
Modified: incubator/mesos/trunk/src/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.hpp (original)
+++ incubator/mesos/trunk/src/messages.hpp Sun Jun 5 05:34:18 2011
@@ -5,6 +5,7 @@
#include <string>
#include <vector>
+#include <reliable.hpp>
#include <tuple.hpp>
#include <nexus.hpp>
@@ -20,8 +21,9 @@ namespace nexus { namespace internal {
enum MessageType {
/* From framework to master. */
- F2M_REGISTER_FRAMEWORK = PROCESS_MSGID,
+ F2M_REGISTER_FRAMEWORK = RELIABLE_MSGID,
F2M_REREGISTER_FRAMEWORK,
+ F2M_FAILOVER_FRAMEWORK,
F2M_UNREGISTER_FRAMEWORK,
F2M_SLOT_OFFER_REPLY,
F2M_REVIVE_OFFERS,
@@ -55,9 +57,9 @@ enum MessageType {
SH2M_HEARTBEAT,
/* From master detector to processes */
+ GOT_MASTER_ID,
NEW_MASTER_DETECTED,
NO_MASTER_DETECTED,
- GOT_MASTER_SEQ,
/* From master to slave. */
M2S_REGISTER_REPLY,
@@ -125,7 +127,8 @@ TUPLE(F2M_REREGISTER_FRAMEWORK,
(std::string /*fid*/,
std::string /*name*/,
std::string /*user*/,
- ExecutorInfo));
+ ExecutorInfo,
+ bool /*failover*/));
TUPLE(F2M_UNREGISTER_FRAMEWORK,
(FrameworkID));
@@ -238,8 +241,8 @@ TUPLE(NEW_MASTER_DETECTED,
TUPLE(NO_MASTER_DETECTED,
());
-TUPLE(GOT_MASTER_SEQ,
- (std::string /* seq */));
+TUPLE(GOT_MASTER_ID,
+ (std::string /* id */));
TUPLE(M2S_REGISTER_REPLY,
(SlaveID));
Modified: incubator/mesos/trunk/src/nexus_local.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_local.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_local.cpp (original)
+++ incubator/mesos/trunk/src/nexus_local.cpp Sun Jun 5 05:34:18 2011
@@ -15,16 +15,8 @@ using nexus::internal::slave::Slave;
using namespace nexus::internal;
-/* TODO(benh): Remove this dependency! */
-/* List of ZooKeeper host:port pairs. */
-std::string zookeeper = "";
-
-
namespace {
-static Master* currentMaster = NULL;
-static vector<Slave*>* currentSlaves = NULL;
-
static pthread_once_t glog_initialized = PTHREAD_ONCE_INIT;
@@ -35,47 +27,62 @@ void initialize_glog() {
} /* namespace { */
-PID run_nexus(int slaves, int32_t cpus, int64_t mem,
- bool ownLogging, bool quiet)
+namespace nexus { namespace internal { namespace local {
+
+static Master *master = NULL;
+static vector<Slave*> *slaves = NULL;
+static MasterDetector *detector = NULL;
+
+
+PID launch(int numSlaves, int32_t cpus, int64_t mem,
+ bool initLogging, bool quiet)
{
- if (currentMaster != NULL)
- fatal("Call to run_nexus while it is already running");
-
- if (ownLogging) {
+ if (master != NULL)
+ fatal("can only launch one local cluster at a time (for now)");
+
+ if (initLogging) {
pthread_once(&glog_initialized, initialize_glog);
if (!quiet)
google::SetStderrLogging(google::INFO);
}
- currentMaster = new Master();
+ master = new Master();
+ slaves = new vector<Slave*>();
- Process::spawn(currentMaster);
+ PID pid = Process::spawn(master);
- currentSlaves = new vector<Slave*>();
+ vector<PID> pids;
- for (int i = 0; i < slaves; i++) {
- Slave* s = new Slave(currentMaster->getPID(), Resources(cpus, mem), true);
- currentSlaves->push_back(s);
- Process::spawn(s);
+ for (int i = 0; i < numSlaves; i++) {
+ Slave* slave = new Slave(Resources(cpus, mem), true);
+ slaves->push_back(slave);
+ pids.push_back(Process::spawn(slave));
}
- return currentMaster->getPID();
+ detector = new BasicMasterDetector(pid, pids, true);
+
+ return pid;
}
-void kill_nexus()
+void shutdown()
{
- Process::post(currentMaster->getPID(), M2M_SHUTDOWN);
+ Process::post(master->getPID(), M2M_SHUTDOWN);
- Process::wait(currentMaster->getPID());
- delete currentMaster;
- currentMaster = NULL;
-
- for (int i = 0; i < currentSlaves->size(); i++) {
- Process::wait(currentSlaves->at(i));
- delete currentSlaves->at(i);
+ Process::wait(master->getPID());
+ delete master;
+ master = NULL;
+
+ for (int i = 0; i < slaves->size(); i++) {
+ Process::wait(slaves->at(i));
+ delete slaves->at(i);
}
- delete currentSlaves;
- currentSlaves = NULL;
+ delete slaves;
+ slaves = NULL;
+
+ delete detector;
+ detector = NULL;
}
+
+}}} /* namespace nexus { namespace internal { namespace local { */
Modified: incubator/mesos/trunk/src/nexus_local.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_local.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_local.hpp (original)
+++ incubator/mesos/trunk/src/nexus_local.hpp Sun Jun 5 05:34:18 2011
@@ -6,8 +6,13 @@
#include "master.hpp"
#include "slave.hpp"
-PID run_nexus(int slaves, int32_t cpus, int64_t mem,
- bool ownLogging, bool quiet);
-void kill_nexus();
+namespace nexus { namespace internal { namespace local {
+
+PID launch(int numSlaves, int32_t cpus, int64_t mem,
+ bool initLogging, bool quiet);
+
+void shutdown();
+
+}}}
#endif /* LOCAL_HPP */
Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun 5 05:34:18 2011
@@ -26,7 +26,6 @@
#include "messages.hpp"
#include "nexus_local.hpp"
#include "nexus_sched.hpp"
-#include "url_processor.hpp"
#include "master_detector.hpp"
#define REPLY_TIMEOUT 20
@@ -48,14 +47,6 @@ using namespace nexus::internal;
namespace nexus { namespace internal {
-/**
- * TODO(benh): Update this comment.
- * Scheduler process, responsible for interacting with the master
- * and responding to Nexus API calls from schedulers. In order to
- * allow a message to be sent back to the master we allow friend
- * functions to invoke 'send'. Therefore, care must be done to insure
- * any synchronization necessary is performed.
- */
class RbReply : public Tuple<Process>
{
@@ -94,127 +85,107 @@ private:
const TaskID tid;
};
+
+/**
+ * TODO(benh): Update this comment.
+ * Scheduler process, responsible for interacting with the master
+ * and responding to Nexus API calls from schedulers. In order to
+ * allow a message to be sent back to the master we allow friend
+ * functions to invoke 'send'. Therefore, care must be done to insure
+ * any synchronization necessary is performed.
+ */
class SchedulerProcess : public Tuple<ReliableProcess>
{
public:
friend class nexus::NexusSchedulerDriver;
private:
- PID master;
NexusSchedulerDriver* driver;
Scheduler* sched;
FrameworkID fid;
string frameworkName;
ExecutorInfo execInfo;
- bool isFT;
- string zkServers;
- MasterDetector *masterDetector;
- unordered_map<OfferID, unordered_map<SlaveID, PID> > savedOffers;
- unordered_map<SlaveID, PID> savedSlavePids;
+ PID master;
volatile bool terminate;
+ unordered_map<OfferID, unordered_map<SlaveID, PID> > savedOffers;
+ unordered_map<SlaveID, PID> savedSlavePids;
+
unordered_map<TaskID, RbReply *> rbReplies;
public:
- SchedulerProcess(const string &_master,
- NexusSchedulerDriver* _driver,
+ SchedulerProcess(NexusSchedulerDriver* _driver,
Scheduler* _sched,
+ FrameworkID _fid,
const string& _frameworkName,
const ExecutorInfo& _execInfo)
: driver(_driver),
sched(_sched),
- fid(""),
- terminate(false),
+ fid(_fid),
frameworkName(_frameworkName),
execInfo(_execInfo),
- masterDetector(NULL)
-{
- pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(_master);
- if (urlPair.first == UrlProcessor::ZOO) {
- isFT = true;
- zkServers = urlPair.second;
- // } else if (urlPair.first == UrlProcessor::NEXUS) {
- } else {
- isFT = false;
- if (urlPair.first == UrlProcessor::NEXUS)
- master = make_pid(urlPair.second.c_str());
- else
- master = make_pid(_master.c_str());
-
- if (!master)
- {
- cerr << "Scheduler failed to resolve master PID " << urlPair.second << endl;
- exit(1);
- }
- }
-}
+ master(PID()),
+ terminate(false) {}
-~SchedulerProcess()
-{
- if (masterDetector != NULL) {
- delete masterDetector;
- masterDetector = NULL;
- }
-}
+ ~SchedulerProcess() {}
protected:
void operator () ()
{
- // Get username of current user
+ // Get username of current user.
struct passwd* passwd;
if ((passwd = getpwuid(getuid())) == NULL)
fatal("failed to get username information");
- string user(passwd->pw_name);
- if (isFT) {
- LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
- masterDetector = new MasterDetector(zkServers, ZNODE, self(), false);
- } else {
- send(self(), pack<NEW_MASTER_DETECTED>("0", master));
- }
+ const string user(passwd->pw_name);
while(true) {
// Rather than send a message to this process when it is time to
- // complete, we set a flag that gets re-read. Sending a message
+ // terminate, we set a flag that gets re-read. Sending a message
// requires some sort of matching or priority reads that
// libprocess currently doesn't support. Note that this field is
- // only read by this process (after setting it in the
- // destructor), so we don't need to protect it in any way. In
- // fact, using a lock to protect it (or for providing atomicity
- // for cleanup, for example), might lead to deadlock with the
- // client code because we already use a lock in SchedulerDriver. That
- // being said, for now we make terminate 'volatile' to guarantee
- // that each read is getting a fresh copy.
+ // only read by this process, so we don't need to protect it in
+ // any way. In fact, using a lock to protect it (or for
+ // providing atomicity for cleanup, for example), might lead to
+ // deadlock with the client code because we already use a lock
+ // in SchedulerDriver. That being said, for now we make
+ // terminate 'volatile' to guarantee that each read is getting a
+ // fresh copy.
// TODO(benh): Do a coherent read so as to avoid using 'volatile'.
if (terminate)
return;
+ // TODO(benh): We need to break the receive every so often to
+ // check if 'terminate' has been set. It would be better to just
+ // send a message rather than have a timeout (see the comment
+ // above for why sending a message will still require us to use
+ // the terminate flag).
switch(receive(2)) {
- // TODO(benh): We need to break the receive loop every so often
- // to check if 'terminate' has been set .. but rather than use a
- // timeout in receive, it would be nice to send a message, but
- // see above.
case NEW_MASTER_DETECTED: {
string masterSeq;
PID masterPid;
unpack<NEW_MASTER_DETECTED>(masterSeq, masterPid);
- LOG(INFO) << "New master at " << masterPid
- << " with ephemeral id:" << masterSeq;
+ LOG(INFO) << "New master at " << masterPid << " with ID:" << masterSeq;
+
+ // Connect as a failover if this is the first master we are
+ // being told about a master AND we already have an id.
+ bool failover = !master && fid != "";
redirect(master, masterPid);
master = masterPid;
link(master);
- if (fid.empty()) {
+ if (fid == "") {
// Touched for the very first time.
- send(master, pack<F2M_REGISTER_FRAMEWORK>(frameworkName, user, execInfo));
+ send(master, pack<F2M_REGISTER_FRAMEWORK>(frameworkName, user, execInfo));
} else {
- // Not the first time.
- send(master, pack<F2M_REREGISTER_FRAMEWORK>(fid, frameworkName, user, execInfo));
+ // Not the first time, or failing over.
+ send(master, pack<F2M_REREGISTER_FRAMEWORK>(fid, frameworkName, user,
+ execInfo, failover));
}
break;
}
@@ -260,17 +231,13 @@ protected:
// Remove the offer since we saved all the PIDs we might use.
savedOffers.erase(oid);
- if (isFT) {
- foreach(const TaskDescription &task, tasks) {
- RbReply *rr = new RbReply(self(), task.taskId);
- rbReplies[task.taskId] = rr;
- link(spawn(rr));
- }
- }
+ foreach(const TaskDescription &task, tasks) {
+ RbReply *rr = new RbReply(self(), task.taskId);
+ rbReplies[task.taskId] = rr;
+ link(spawn(rr));
+ }
send(master, pack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params));
-
-
break;
}
@@ -352,12 +319,8 @@ protected:
}
case PROCESS_EXIT: {
- if (isFT) {
- LOG(WARNING) << "Connection to master lost .. waiting for new master.";
- } else {
- const char* message = "Connection to master failed";
- invoke(bind(&Scheduler::error, sched, driver, -1, message));
- }
+ // TODO(benh): Don't wait for a new master forever.
+ LOG(WARNING) << "Connection to master lost .. waiting for new master.";
break;
}
@@ -416,17 +379,19 @@ ExecutorInfo Scheduler::getExecutorInfo(
// Default implementation of Scheduler::error that logs to stderr
-void Scheduler::error(SchedulerDriver* s, int code, const string &message)
+void Scheduler::error(SchedulerDriver* driver, int code, const string &message)
{
cerr << "Nexus error: " << message
<< " (error code: " << code << ")" << endl;
- s->stop();
+ driver->stop();
}
NexusSchedulerDriver::NexusSchedulerDriver(Scheduler* _sched,
- const string &_master)
- : sched(_sched), master(_master), running(false), process(NULL)
+ const string &_url,
+ FrameworkID _fid)
+ : sched(_sched), url(_url), fid(_fid), running(false),
+ process(NULL), detector(NULL)
{
// Create mutex and condition variable
pthread_mutexattr_t attr;
@@ -458,56 +423,58 @@ NexusSchedulerDriver::~NexusSchedulerDri
// to the user somehow.
Process::wait(process);
delete process;
+
+ MasterDetector::destroy(detector);
+
+ // Check and see if we need to shutdown a local cluster.
+ if (url == "local" || url == "localquiet")
+ local::shutdown();
}
-void NexusSchedulerDriver::start()
+int NexusSchedulerDriver::start()
{
Lock lock(&mutex);
if (running) {
- error(1, "cannot call start - scheduler is already running");
- return;
+ //error(1, "cannot call start - scheduler is already running");
+ return - 1;
}
- PID pid;
-
- string initServer = master;
-
- if (master == string("localquiet")) {
- // TODO(benh): Look up resources in environment variables.
- pid = run_nexus(1, 1, 1073741824, true, true);
- ostringstream ss;
- ss << pid;
- initServer = ss.str();
- } else if (master == string("local")) {
- // TODO(benh): Look up resources in environment variables.
- pid = run_nexus(1, 1, 1073741824, true, false);
- ostringstream ss;
- ss << pid;
- initServer = ss.str();
- }
-
const string& frameworkName = sched->getFrameworkName(this);
const ExecutorInfo& executorInfo = sched->getExecutorInfo(this);
- process = new SchedulerProcess(initServer, this, sched, frameworkName, executorInfo);
-
- Process::spawn(process);
+ process = new SchedulerProcess(this, sched, fid, frameworkName, executorInfo);
+ PID pid = Process::spawn(process);
+
+ // Check and see if we need to launch a local cluster.
+ if (url == "local") {
+ // TODO(benh): Get number of slaves and resources per slave from
+ // command line (or environment or configuration?).
+ PID master = local::launch(1, 1, 1073741824, true, false);
+ detector = new BasicMasterDetector(master, pid);
+ } else if (url == "localquiet") {
+ // TODO(benh): Get number of slaves and resources per slave from
+ // command line (or environment?).
+ PID master = local::launch(1, 1, 1073741824, true, true);
+ detector = new BasicMasterDetector(master, pid);
+ } else {
+ detector = MasterDetector::create(url, pid, false, true);
+ }
running = true;
-}
+ return 0;
+}
-void NexusSchedulerDriver::stop()
+int NexusSchedulerDriver::stop()
{
Lock lock(&mutex);
if (!running) {
// Don't issue an error (could lead to an infinite loop).
- // TODO(benh): It would be much cleaner to return success or failure!
- return;
+ return -1;
}
// TODO(benh): Do a Process::post instead?
@@ -519,97 +486,112 @@ void NexusSchedulerDriver::stop()
running = false;
pthread_cond_signal(&cond);
+
+ return 0;
}
-void NexusSchedulerDriver::join()
+int NexusSchedulerDriver::join()
{
Lock lock(&mutex);
while (running)
pthread_cond_wait(&cond, &mutex);
+
+ return 0;
}
-void NexusSchedulerDriver::run()
+int NexusSchedulerDriver::run()
{
- start();
- join();
+ int ret = start();
+ return ret != 0 ? ret : join();
}
-void NexusSchedulerDriver::killTask(TaskID tid)
+int NexusSchedulerDriver::killTask(TaskID tid)
{
Lock lock(&mutex);
if (!running) {
- error(1, "cannot call killTask - scheduler is not running");
- return;
+ //error(1, "cannot call killTask - scheduler is not running");
+ return -1;
}
// TODO(benh): Do a Process::post instead?
process->send(process->master,
process->pack<F2M_KILL_TASK>(process->fid, tid));
+
+ return 0;
}
-void NexusSchedulerDriver::replyToOffer(OfferID offerId,
- const vector<TaskDescription> &tasks,
- const string_map ¶ms)
+int NexusSchedulerDriver::replyToOffer(OfferID offerId,
+ const vector<TaskDescription> &tasks,
+ const string_map ¶ms)
{
Lock lock(&mutex);
if (!running) {
- error(1, "cannot call replyToOffer - scheduler is not running");
- return;
+ //error(1, "cannot call replyToOffer - scheduler is not running");
+ return -1;
}
// TODO(benh): Do a Process::post instead?
process->send(process->self(), process->pack<F2F_SLOT_OFFER_REPLY>(offerId, tasks, Params(params)));
+
+ return 0;
}
-void NexusSchedulerDriver::reviveOffers()
+int NexusSchedulerDriver::reviveOffers()
{
Lock lock(&mutex);
if (!running) {
- error(1, "cannot call reviveOffers - scheduler is not running");
- return;
+ //error(1, "cannot call reviveOffers - scheduler is not running");
+ return -1;
}
// TODO(benh): Do a Process::post instead?
process->send(process->master,
process->pack<F2M_REVIVE_OFFERS>(process->fid));
+
+ return 0;
}
-void NexusSchedulerDriver::sendFrameworkMessage(const FrameworkMessage &message)
+int NexusSchedulerDriver::sendFrameworkMessage(const FrameworkMessage &message)
{
Lock lock(&mutex);
if (!running) {
- error(1, "cannot call sendFrameworkMessage - scheduler is not running");
- return;
+ //error(1, "cannot call sendFrameworkMessage - scheduler is not running");
+ return -1;
}
+ // TODO(benh): Do a Process::post instead?
+
process->send(process->self(), process->pack<F2F_FRAMEWORK_MESSAGE>(message));
+
+ return 0;
}
-void NexusSchedulerDriver::sendHints(const string_map& hints)
+int NexusSchedulerDriver::sendHints(const string_map& hints)
{
Lock lock(&mutex);
if (!running) {
- error(1, "cannot call sendHints - scheduler is not running");
- return;
+ //error(1, "cannot call sendHints - scheduler is not running");
+ return -1;
}
// TODO(*): Send the hints; for now, we do nothing
- error(1, "sendHints is not yet implemented");
+ //error(1, "sendHints is not yet implemented");
+ return -1;
}
Modified: incubator/mesos/trunk/src/process_based_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/process_based_isolation_module.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/process_based_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/process_based_isolation_module.cpp Sun Jun 5 05:34:18 2011
@@ -118,7 +118,7 @@ ProcessBasedIsolationModule::Reaper::Rea
: module(m)
{}
-
+
void ProcessBasedIsolationModule::Reaper::operator () ()
{
link(module->slave->getPID());
Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun 5 05:34:18 2011
@@ -2,16 +2,20 @@
#include <fstream>
#include <algorithm>
+
+#include "isolation_module_factory.hpp"
#include "slave.hpp"
#include "slave_webui.hpp"
-#include "isolation_module_factory.hpp"
-#include "url_processor.hpp"
-#define FT_TIMEOUT 10
+// There's no gethostbyname2 on Solaris, so fake it by calling gethostbyname
+#ifdef __sun__
+#define gethostbyname2(name, _) gethostbyname(name)
+#endif
using std::list;
using std::make_pair;
using std::ostringstream;
+using std::istringstream;
using std::pair;
using std::queue;
using std::string;
@@ -25,10 +29,6 @@ using namespace nexus;
using namespace nexus::internal;
using namespace nexus::internal::slave;
-// There's no gethostbyname2 on Solaris, so fake it by calling gethostbyname
-#ifdef __sun__
-#define gethostbyname2(name, _) gethostbyname(name)
-#endif
namespace {
@@ -63,61 +63,18 @@ public:
} /* namespace */
-Slave::Slave(const string &_master, Resources _resources, bool _local)
- : masterDetector(NULL), resources(_resources), local(_local), id(""),
- isolationType("process"), isolationModule(NULL)
-{
- pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(_master);
- if (urlPair.first == UrlProcessor::ZOO) {
- isFT = true;
- zkServers = urlPair.second;
- } else
- {
- isFT = false;
- if (urlPair.first == UrlProcessor::NEXUS)
- master = make_pid(urlPair.second.c_str());
- else
- master = make_pid(_master.c_str());
-
- if (!master)
- {
- cerr << "Slave failed to resolve master PID " << urlPair.second << endl;
- exit(1);
- }
- }
-}
-
+Slave::Slave(Resources _resources, bool _local, const string &_isolationType)
+ : id(""), resources(_resources), local(_local),
+ isolationType(_isolationType), isolationModule(NULL) {}
-Slave::Slave(const string &_master, Resources _resources, bool _local,
- const string &_isolationType)
- : masterDetector(NULL), resources(_resources), local(_local), id(""),
- isolationType(_isolationType), isolationModule(NULL)
-{
- pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(_master);
- if (urlPair.first == UrlProcessor::ZOO) {
- isFT = true;
- zkServers = urlPair.second;
- } else {
- isFT = false;
- if (urlPair.first == UrlProcessor::NEXUS)
- master = make_pid(urlPair.second.c_str());
- else
- master = make_pid(_master.c_str());
-
- if (!master)
- {
- cerr << "Slave failed to resolve master PID " << urlPair.second << endl;
- exit(1);
- }
- }
-}
Slave::~Slave()
{
- if (masterDetector != NULL) {
- delete masterDetector;
- masterDetector = NULL;
- }
+ // TODO(matei): Add support for factory style destroy of objects!
+ if (isolationModule != NULL)
+ delete isolationModule;
+
+ // TODO(benh): Shut down and free executors?
}
@@ -151,13 +108,6 @@ void Slave::operator () ()
{
LOG(INFO) << "Slave started at " << self();
- if (isFT) {
- LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
- masterDetector = new MasterDetector(zkServers, ZNODE, self(), false);
- } else {
- send(self(), pack<NEW_MASTER_DETECTED>("0", master));
- }
-
// Get our hostname
char buf[256];
gethostname(buf, sizeof(buf));
@@ -171,7 +121,7 @@ void Slave::operator () ()
if (!publicDns) {
publicDns = hostname;
}
-
+
FrameworkID fid;
TaskID tid;
TaskState taskState;
@@ -186,8 +136,7 @@ void Slave::operator () ()
PID masterPid;
unpack<NEW_MASTER_DETECTED>(masterSeq, masterPid);
- LOG(INFO) << "New master at " << masterPid
- << " with ephemeral id:" << masterSeq;
+ LOG(INFO) << "New master at " << masterPid << " with ID:" << masterSeq;
redirect(master, masterPid);
master = masterPid;
@@ -329,7 +278,6 @@ void Slave::operator () ()
isolationModule->resourcesChanged(fw);
// Tell executor that it's registered and give it its queued tasks
send(from(), pack<S2E_REGISTER_REPLY>(this->id,
- hostname,
fw->name,
fw->executorInfo.initArg));
sendQueuedTasks(fw);
@@ -351,12 +299,12 @@ void Slave::operator () ()
isolationModule->resourcesChanged(fw);
}
}
- // Pass on the update to the master
- if (isFT) {
- string msg = tupleToString(pack<S2M_FT_STATUS_UPDATE>(id, fid, tid, taskState, data));
- rsend(master, S2M_FT_STATUS_UPDATE, msg.c_str(), sizeof(msg.c_str()));
- } else
- send(master, pack<S2M_STATUS_UPDATE>(id, fid, tid, taskState, data));
+
+ // Pass on the update to the master.
+ const string &msg =
+ tupleToString(pack<S2M_FT_STATUS_UPDATE>(id, fid, tid, taskState,
+ data));
+ rsend(master, S2M_FT_STATUS_UPDATE, msg.data(), msg.size());
break;
}
@@ -377,56 +325,42 @@ void Slave::operator () ()
LOG(INFO) << "Process exited: " << from();
if (from() == master) {
- // TODO: Fault tolerance!
- if (isFT) {
- LOG(WARNING) << "FT: Master disconnected! Waiting for a new master to be elected.";
- } else {
- LOG(ERROR) << "Master disconnected! Exiting. Consider running Nexus in FT mode!";
- if (isolationModule != NULL)
- delete isolationModule;
- // TODO: Shut down executors?
- return;
+ LOG(WARNING) << "Master disconnected! "
+ << "Waiting for a new master to be elected.";
+ // TODO(benh): After so long waiting for a master, commit suicide.
+ } else {
+ // Check if an executor has exited.
+ foreachpair (_, Executor *ex, executors) {
+ if (from() == ex->pid) {
+ LOG(INFO) << "Executor for framework " << ex->frameworkId
+ << " disconnected";
+ Framework *framework = getFramework(fid);
+ if (framework != NULL) {
+ send(master, pack<S2M_LOST_EXECUTOR>(id, ex->frameworkId, -1));
+ killFramework(framework);
+ }
+ break;
+ }
}
}
- foreachpair (_, Executor *ex, executors) {
- if (from() == ex->pid) {
- LOG(INFO) << "Executor for framework " << ex->frameworkId
- << " disconnected";
- Framework *framework = getFramework(fid);
- if (framework != NULL) {
- send(master, pack<S2M_LOST_EXECUTOR>(id, ex->frameworkId, -1));
- killFramework(framework);
- }
- break;
- }
- }
-
break;
}
case M2S_SHUTDOWN: {
LOG(INFO) << "Asked to shut down by master: " << from();
- // TODO(matei): Add support for factory style destroy of objects!
- if (isolationModule != NULL)
- delete isolationModule;
- // TODO: Shut down executors?
return;
}
-
case S2S_SHUTDOWN: {
LOG(INFO) << "Asked to shut down by " << from();
- // TODO(matei): Add support for factory style destroy of objects!
- if (isolationModule != NULL)
- delete isolationModule;
- // TODO: Shut down executors?
return;
}
case PROCESS_TIMEOUT: {
break;
}
+
default: {
LOG(ERROR) << "Received unknown message ID " << msgid()
<< " from " << from();
@@ -512,6 +446,8 @@ void Slave::killFramework(Framework *fw)
// Called by isolation module when an executor process exits
+// TODO(benh): Make this callback be a message so that we can avoid
+// race conditions.
void Slave::executorExited(FrameworkID fid, int status)
{
if (Framework *f = getFramework(fid)) {
Modified: incubator/mesos/trunk/src/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave.hpp Sun Jun 5 05:34:18 2011
@@ -165,8 +165,6 @@ public:
typedef unordered_map<FrameworkID, Executor*> ExecutorMap;
bool isFT;
- string zkServers;
- MasterDetector *masterDetector;
PID master;
SlaveID id;
Resources resources;
@@ -177,10 +175,8 @@ public:
IsolationModule *isolationModule;
public:
- Slave(const string &_master, Resources resources, bool _local);
-
- Slave(const string &_master, Resources resources, bool _local,
- const string& isolationType);
+ Slave(Resources resources, bool local,
+ const string& isolationType = "process");
virtual ~Slave();
Modified: incubator/mesos/trunk/src/slave_main.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave_main.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave_main.cpp (original)
+++ incubator/mesos/trunk/src/slave_main.cpp Sun Jun 5 05:34:18 2011
@@ -8,19 +8,19 @@ using namespace std;
using namespace nexus::internal::slave;
-/* List of ZooKeeper host:port pairs. */
-string zookeeper = "";
-
-
void usage(const char *programName)
{
cerr << "Usage: " << programName
+ << " [--url URL]"
<< " [--cpus NUM]"
<< " [--mem NUM]"
<< " [--isolation TYPE]"
- << " [--zookeeper host:port]"
- << " [--quiet]"
- << " <master_pid>"
+ << " [--quiet]" << endl
+ << endl
+ << "URL (used for connecting to a master) may be one of:" << endl
+ << " nexus://id@host:port" << endl
+ << " zoo://host1:port1,host2:port2,..." << endl
+ << " zoofile://file where file contains a host:port pair per line"
<< endl;
}
@@ -33,22 +33,26 @@ int main(int argc, char **argv)
}
option options[] = {
+ {"url", required_argument, 0, 'u'},
{"cpus", required_argument, 0, 'c'},
{"mem", required_argument, 0, 'm'},
{"isolation", required_argument, 0, 'i'},
- {"zookeeper", required_argument, 0, 'z'},
{"quiet", no_argument, 0, 'q'},
};
+ string url = "";
Resources resources(1, 1 * Gigabyte);
string isolation = "process";
bool quiet = false;
int opt;
int index;
- while ((opt = getopt_long(argc, argv, "c:m:i:z:q", options, &index)) != -1) {
+ while ((opt = getopt_long(argc, argv, "u:c:m:i:q", options, &index)) != -1) {
switch (opt) {
- case 'c':
+ case 'u':
+ url = optarg;
+ break;
+ case 'c':
resources.cpus = atoi(optarg);
break;
case 'm':
@@ -57,15 +61,6 @@ int main(int argc, char **argv)
case 'i':
isolation = optarg;
break;
- case 'z':
-#ifndef USING_ZOOKEEPER
- cerr << "--zookeeper not supported in this build" << endl;
- usage(argv[0]);
- exit(1);
-#else
- zookeeper = optarg;
-#endif
- break;
case 'q':
quiet = true;
break;
@@ -80,36 +75,19 @@ int main(int argc, char **argv)
if (!quiet)
google::SetStderrLogging(google::INFO);
- else
- LeaderDetector::setQuiet(true);
+ FLAGS_log_dir = "/tmp";
FLAGS_logbufsecs = 1;
google::InitGoogleLogging(argv[0]);
- // Check that we either have zookeeper as an argument or exactly one
- // non-option argument (i.e., the master PID).
- if (zookeeper.empty() && optind != argc - 1) {
- usage(argv[0]);
- exit(1);
- }
-
- // Resolve the master PID.
- PID master;
-
- if (zookeeper.empty()) {
- istringstream iss(argv[optind]);
- if (!(iss >> master)) {
- cerr << "Failed to resolve master PID " << argv[optind] << endl;
- exit(1);
- }
- }
-
LOG(INFO) << "Build: " << BUILD_DATE << " by " << BUILD_USER;
LOG(INFO) << "Starting Nexus slave";
- Slave* slave = new Slave(master, resources, false, isolation);
+ Slave* slave = new Slave(resources, false, isolation);
PID pid = Process::spawn(slave);
+ MasterDetector *detector = MasterDetector::create(url, pid, false, quiet);
+
#ifdef NEXUS_WEBUI
if (chdir(dirname(argv[0])) != 0)
fatalerror("could not change into %s for running webui", dirname(argv[0]));
@@ -117,5 +95,8 @@ int main(int argc, char **argv)
#endif
Process::wait(pid);
+
+ MasterDetector::destroy(detector);
+
return 0;
}
Modified: incubator/mesos/trunk/src/tests/test_master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/test_master.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/test_master.cpp (original)
+++ incubator/mesos/trunk/src/tests/test_master.cpp Sun Jun 5 05:34:18 2011
@@ -65,26 +65,26 @@ public:
TEST(MasterTest, NoopFrameworkWithOneSlave)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID master = run_nexus(1, 2, 1 * Gigabyte, false, false);
+ PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
NoopScheduler sched(1);
NexusSchedulerDriver driver(&sched, master);
driver.run();
EXPECT_TRUE(sched.registeredCalled);
EXPECT_EQ(1, sched.offersGotten);
- kill_nexus();
+ local::shutdown();
}
TEST(MasterTest, NoopFrameworkWithMultipleSlaves)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID master = run_nexus(10, 2, 1 * Gigabyte, false, false);
+ PID master = local::launch(10, 2, 1 * Gigabyte, false, false);
NoopScheduler sched(10);
NexusSchedulerDriver driver(&sched, master);
driver.run();
EXPECT_TRUE(sched.registeredCalled);
EXPECT_EQ(1, sched.offersGotten);
- kill_nexus();
+ local::shutdown();
}
@@ -122,7 +122,7 @@ public:
TEST(MasterTest, DuplicateTaskIdsInResponse)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID master = run_nexus(1, 3, 3 * Gigabyte, false, false);
+ PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
vector<TaskDescription> tasks;
map<string, string> params;
params["cpus"] = "1";
@@ -134,14 +134,14 @@ TEST(MasterTest, DuplicateTaskIdsInRespo
NexusSchedulerDriver driver(&sched, master);
driver.run();
EXPECT_EQ("Duplicate task ID: 1", sched.errorMessage);
- kill_nexus();
+ local::shutdown();
}
TEST(MasterTest, TooMuchMemoryInTask)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID master = run_nexus(1, 3, 3 * Gigabyte, false, false);
+ PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
vector<TaskDescription> tasks;
map<string, string> params;
params["cpus"] = "1";
@@ -151,14 +151,14 @@ TEST(MasterTest, TooMuchMemoryInTask)
NexusSchedulerDriver driver(&sched, master);
driver.run();
EXPECT_EQ("Too many resources accepted", sched.errorMessage);
- kill_nexus();
+ local::shutdown();
}
TEST(MasterTest, TooMuchCpuInTask)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID master = run_nexus(1, 3, 3 * Gigabyte, false, false);
+ PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
vector<TaskDescription> tasks;
map<string, string> params;
params["cpus"] = "4";
@@ -168,14 +168,14 @@ TEST(MasterTest, TooMuchCpuInTask)
NexusSchedulerDriver driver(&sched, master);
driver.run();
EXPECT_EQ("Too many resources accepted", sched.errorMessage);
- kill_nexus();
+ local::shutdown();
}
TEST(MasterTest, TooLittleCpuInTask)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID master = run_nexus(1, 3, 3 * Gigabyte, false, false);
+ PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
vector<TaskDescription> tasks;
map<string, string> params;
params["cpus"] = "0";
@@ -185,14 +185,14 @@ TEST(MasterTest, TooLittleCpuInTask)
NexusSchedulerDriver driver(&sched, master);
driver.run();
EXPECT_EQ("Invalid task size: <0 CPUs, 1073741824 MEM>", sched.errorMessage);
- kill_nexus();
+ local::shutdown();
}
TEST(MasterTest, TooLittleMemoryInTask)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID master = run_nexus(1, 3, 3 * Gigabyte, false, false);
+ PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
vector<TaskDescription> tasks;
map<string, string> params;
params["cpus"] = "1";
@@ -202,14 +202,14 @@ TEST(MasterTest, TooLittleMemoryInTask)
NexusSchedulerDriver driver(&sched, master);
driver.run();
EXPECT_EQ("Invalid task size: <1 CPUs, 1 MEM>", sched.errorMessage);
- kill_nexus();
+ local::shutdown();
}
TEST(MasterTest, TooMuchMemoryAcrossTasks)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID master = run_nexus(1, 3, 3 * Gigabyte, false, false);
+ PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
vector<TaskDescription> tasks;
map<string, string> params;
params["cpus"] = "1";
@@ -220,14 +220,14 @@ TEST(MasterTest, TooMuchMemoryAcrossTask
NexusSchedulerDriver driver(&sched, master);
driver.run();
EXPECT_EQ("Too many resources accepted", sched.errorMessage);
- kill_nexus();
+ local::shutdown();
}
TEST(MasterTest, TooMuchCpuAcrossTasks)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID master = run_nexus(1, 3, 3 * Gigabyte, false, false);
+ PID master = local::launch(1, 3, 3 * Gigabyte, false, false);
vector<TaskDescription> tasks;
map<string, string> params;
params["cpus"] = "2";
@@ -238,14 +238,14 @@ TEST(MasterTest, TooMuchCpuAcrossTasks)
NexusSchedulerDriver driver(&sched, master);
driver.run();
EXPECT_EQ("Too many resources accepted", sched.errorMessage);
- kill_nexus();
+ local::shutdown();
}
TEST(MasterTest, ResourcesReofferedAfterReject)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID master = run_nexus(10, 2, 1 * Gigabyte, false, false);
+ PID master = local::launch(10, 2, 1 * Gigabyte, false, false);
NoopScheduler sched1(10);
NexusSchedulerDriver driver1(&sched1, master);
@@ -259,14 +259,14 @@ TEST(MasterTest, ResourcesReofferedAfter
EXPECT_TRUE(sched2.registeredCalled);
EXPECT_EQ(1, sched2.offersGotten);
- kill_nexus();
+ local::shutdown();
}
TEST(MasterTest, ResourcesReofferedAfterBadResponse)
{
ASSERT_TRUE(GTEST_IS_THREADSAFE);
- PID master = run_nexus(1, 2, 1 * Gigabyte, false, false);
+ PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
vector<TaskDescription> tasks;
map<string, string> params;
@@ -284,7 +284,7 @@ TEST(MasterTest, ResourcesReofferedAfter
EXPECT_TRUE(sched2.registeredCalled);
EXPECT_EQ(1, sched2.offersGotten);
- kill_nexus();
+ local::shutdown();
}
@@ -324,9 +324,11 @@ TEST(MasterTest, SlaveLost)
Master m;
PID master = Process::spawn(&m);
- Slave s(master, Resources(2, 1 * Gigabyte), true);
+ Slave s(Resources(2, 1 * Gigabyte), true);
PID slave = Process::spawn(&s);
+ BasicMasterDetector detector(master, slave, true);
+
SlaveLostScheduler sched(slave);
NexusSchedulerDriver driver(&sched, master);
@@ -342,3 +344,80 @@ TEST(MasterTest, SlaveLost)
/* TODO(benh): Test lost slave due to missing heartbeats. */
+
+
+class FailoverScheduler : public Scheduler
+{
+public:
+ bool registeredCalled;
+
+ FailoverScheduler() : registeredCalled(false) {}
+
+ virtual ~FailoverScheduler() {}
+
+ virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
+ return ExecutorInfo("noexecutor", "");
+ }
+
+ virtual void registered(SchedulerDriver *d, FrameworkID fid) {
+ LOG(INFO) << "FailoverScheduler registered";
+ registeredCalled = true;
+ d->stop();
+ }
+};
+
+
+class FailingScheduler : public Scheduler
+{
+public:
+ FailoverScheduler *failoverSched;
+ PID master;
+ NexusSchedulerDriver *driver;
+ string errorMessage;
+
+ FailingScheduler(FailoverScheduler *_failoverSched, const PID &_master)
+ : failoverSched(_failoverSched), master(_master) {}
+
+ virtual ~FailingScheduler() {
+ delete driver;
+ }
+
+ virtual ExecutorInfo getExecutorInfo(SchedulerDriver*) {
+ return ExecutorInfo("noexecutor", "");
+ }
+
+ virtual void registered(SchedulerDriver*, FrameworkID fid) {
+ LOG(INFO) << "FailingScheduler registered";
+ driver = new NexusSchedulerDriver(failoverSched, master, fid);
+ driver->start();
+ }
+
+ virtual void error(SchedulerDriver* d,
+ int code,
+ const std::string& message) {
+ errorMessage = message;
+ d->stop();
+ }
+};
+
+
+TEST(MasterTest, SchedulerFailover)
+{
+ ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+ PID master = local::launch(1, 2, 1 * Gigabyte, false, false);
+
+ FailoverScheduler failoverSched;
+ FailingScheduler failingSched(&failoverSched, master);
+
+ NexusSchedulerDriver driver(&failingSched, master);
+ driver.run();
+
+ EXPECT_EQ("Failover", failingSched.errorMessage);
+
+ failingSched.driver->join();
+
+ EXPECT_TRUE(failoverSched.registeredCalled);
+
+ local::shutdown();
+}
Modified: incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp Sun Jun 5 05:34:18 2011
@@ -161,7 +161,7 @@ void ReliableProcess::rsend(const PID &t
}
-MSGID ReliableProcess::receive(time_t secs)
+MSGID ReliableProcess::receive(double secs)
{
// Record sequence number for current (now old) _reliable_ message
// and also free the message.
Modified: incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp Sun Jun 5 05:34:18 2011
@@ -22,7 +22,7 @@ class ReliableProcess : public Process
{
public:
ReliableProcess();
- ~ReliableProcess();
+ virtual ~ReliableProcess();
protected:
/**
@@ -74,7 +74,7 @@ protected:
virtual void rsend(const PID &to, MSGID id, const char *data, size_t length);
/* Blocks for message at most specified seconds. */
- virtual MSGID receive(time_t);
+ virtual MSGID receive(double secs);
/**
* Redirect unacknolwedged messages to be sent to a different PID.
Modified: incubator/mesos/trunk/src/third_party/libprocess/serialization.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/serialization.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/serialization.cpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/serialization.cpp Sun Jun 5 05:34:18 2011
@@ -9,6 +9,12 @@ namespace process { namespace serializat
/* TODO(*): Check stream errors! Report errors! Ahhhh! */
+void serializer::operator & (const bool &b)
+{
+ stream.put(b ? 1 : 0);
+}
+
+
void serializer::operator & (const int32_t &i)
{
uint32_t netInt = htonl((uint32_t) i);
@@ -52,6 +58,12 @@ void serializer::operator & (const PID &
}
+void deserializer::operator & (bool &b)
+{
+ b = stream.get();
+}
+
+
void deserializer::operator & (int32_t &i)
{
uint32_t netInt;
Modified: incubator/mesos/trunk/src/third_party/libprocess/serialization.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/serialization.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/serialization.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/serialization.hpp Sun Jun 5 05:34:18 2011
@@ -14,6 +14,7 @@ struct serializer
serializer(std::ostringstream& s) : stream(s) {}
+ void operator & (const bool &);
void operator & (const int32_t &);
void operator & (const int64_t &);
void operator & (const size_t &);
@@ -27,6 +28,7 @@ struct deserializer
deserializer(std::istringstream &s) : stream(s) {}
+ void operator & (bool &);
void operator & (int32_t &);
void operator & (int64_t &);
void operator & (size_t &);
Modified: incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp Sun Jun 5 05:34:18 2011
@@ -171,7 +171,7 @@ protected:
const char *data;
size_t length;
- data = Process::body(&length);
+ data = P::body(&length);
std::string s(data, length);
std::istringstream is(s);
process::serialization::deserializer d(is);
@@ -186,7 +186,7 @@ protected:
const char *data;
size_t length;
- data = Process::body(&length);
+ data = P::body(&length);
std::string s(data, length);
std::istringstream is(s);
process::serialization::deserializer d(is);
@@ -204,7 +204,7 @@ protected:
const char *data;
size_t length;
- data = Process::body(&length);
+ data = P::body(&length);
std::string s(data, length);
std::istringstream is(s);
process::serialization::deserializer d(is);
@@ -223,7 +223,7 @@ protected:
const char *data;
size_t length;
- data = Process::body(&length);
+ data = P::body(&length);
std::string s(data, length);
std::istringstream is(s);
process::serialization::deserializer d(is);
@@ -244,7 +244,7 @@ protected:
const char *data;
size_t length;
- data = Process::body(&length);
+ data = P::body(&length);
std::string s(data, length);
std::istringstream is(s);
process::serialization::deserializer d(is);
@@ -267,7 +267,7 @@ protected:
const char *data;
size_t length;
- data = Process::body(&length);
+ data = P::body(&length);
std::string s(data, length);
std::istringstream is(s);
process::serialization::deserializer d(is);
@@ -292,7 +292,7 @@ protected:
const char *data;
size_t length;
- data = Process::body(&length);
+ data = P::body(&length);
std::string s(data, length);
std::istringstream is(s);
process::serialization::deserializer d(is);
@@ -319,7 +319,7 @@ protected:
const char *data;
size_t length;
- data = Process::body(&length);
+ data = P::body(&length);
std::string s(data, length);
std::istringstream is(s);
process::serialization::deserializer d(is);
@@ -348,7 +348,7 @@ protected:
const char *data;
size_t length;
- data = Process::body(&length);
+ data = P::body(&length);
std::string s(data, length);
std::istringstream is(s);
process::serialization::deserializer d(is);
@@ -379,7 +379,7 @@ protected:
const char *data;
size_t length;
- data = Process::body(&length);
+ data = P::body(&length);
std::string s(data, length);
std::istringstream is(s);
process::serialization::deserializer d(is);
@@ -455,7 +455,7 @@ protected:
std::string data = os.str();
- Process::send(to, ID, data.data(), data.size());
+ P::send(to, ID, data.data(), data.size());
}
template <MSGID ID>
@@ -585,7 +585,7 @@ protected:
MSGID receive(double secs)
{
- return Process::receive(secs);
+ return P::receive(secs);
}
template <MSGID ID>
Modified: incubator/mesos/trunk/src/url_processor.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/url_processor.cpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/url_processor.cpp (original)
+++ incubator/mesos/trunk/src/url_processor.cpp Sun Jun 5 05:34:18 2011
@@ -53,8 +53,7 @@ pair<UrlProcessor::URLType, string> UrlP
} else {
- LOG(WARNING) << "Could not parse master/zoo URL";
- return pair<UrlProcessor::URLType, string>(UrlProcessor::UNKNOWN, "");
+ return pair<UrlProcessor::URLType, string>(UrlProcessor::UNKNOWN, url);
}
}
Modified: incubator/mesos/trunk/src/zookeeper.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper.hpp?rev=1131794&r1=1131793&r2=1131794&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper.hpp (original)
+++ incubator/mesos/trunk/src/zookeeper.hpp Sun Jun 5 05:34:18 2011
@@ -7,9 +7,7 @@
*
* To provide for varying underlying implementations the pimpl idiom
* (also known as the compiler-firewall, bridge pattern, etc) was used
- * for the ZooKeeper class. While the Watcher class may need some to
- * support varying underlying implementation details, we choose for
- * now to keep it a cleaner interface.
+ * for the ZooKeeper class.
*
* Author: Benjamin Hindman <be...@berkeley.edu>
*/