You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:19:53 UTC
svn commit: r1132287 [1/2] - in /incubator/mesos/trunk/src: common/
detector/ local/ master/ messaging/ sched/ slave/ tests/
Author: benh
Date: Sun Jun 5 09:19:53 2011
New Revision: 1132287
URL: http://svn.apache.org/viewvc?rev=1132287&view=rev
Log:
Added a "SlavesManager" to help coordinate which slaves are an active part of a Mesos cluster. This includes ZooKeeper support for when using ZooKeeper as the persistant backing store of active slaves.
Modified:
incubator/mesos/trunk/src/common/resources.cpp
incubator/mesos/trunk/src/common/tokenize.cpp
incubator/mesos/trunk/src/common/tokenize.hpp
incubator/mesos/trunk/src/common/zookeeper.cpp
incubator/mesos/trunk/src/detector/detector.cpp
incubator/mesos/trunk/src/local/local.cpp
incubator/mesos/trunk/src/master/master.cpp
incubator/mesos/trunk/src/master/master.hpp
incubator/mesos/trunk/src/messaging/messages.cpp
incubator/mesos/trunk/src/messaging/messages.hpp
incubator/mesos/trunk/src/messaging/messages.proto
incubator/mesos/trunk/src/sched/sched.cpp
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/slave/slave.hpp
incubator/mesos/trunk/src/tests/master_test.cpp
Modified: incubator/mesos/trunk/src/common/resources.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/resources.cpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/resources.cpp (original)
+++ incubator/mesos/trunk/src/common/resources.cpp Sun Jun 5 09:19:53 2011
@@ -569,7 +569,7 @@ Resource Resources::parse(const string&
if (index == 0) {
// This is a ranges.
Resource::Ranges ranges;
- const vector<string>& tokens = tokenize(temp, "[]-,\n");
+ const vector<string>& tokens = tokenize::tokenize(temp, "[]-,\n");
if (tokens.size() % 2 != 0) {
LOG(FATAL) << "Error parsing value for " << name
<< ", expecting one or more \"ranges\"";
@@ -596,7 +596,7 @@ Resource Resources::parse(const string&
if (index == 0) {
// This is a set.
Resource::Set set;
- const vector<string>& tokens = tokenize(temp, "{},\n");
+ const vector<string>& tokens = tokenize::tokenize(temp, "{},\n");
for (int i = 0; i < tokens.size(); i++) {
set.add_item(tokens[i]);
}
@@ -633,10 +633,10 @@ Resources Resources::parse(const string&
// Tokenize and parse the value of "resources".
Resources resources;
- vector<string> tokens = tokenize(s, ";\n");
+ vector<string> tokens = tokenize::tokenize(s, ";\n");
for (int i = 0; i < tokens.size(); i++) {
- const vector<string>& pairs = tokenize(tokens[i], ":");
+ const vector<string>& pairs = tokenize::tokenize(tokens[i], ":");
if (pairs.size() != 2) {
LOG(FATAL) << "bad value for resources, missing ':' within " << pairs[0];
}
Modified: incubator/mesos/trunk/src/common/tokenize.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/tokenize.cpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/tokenize.cpp (original)
+++ incubator/mesos/trunk/src/common/tokenize.cpp Sun Jun 5 09:19:53 2011
@@ -1,10 +1,15 @@
+#include <sstream>
+
+#include "foreach.hpp"
#include "tokenize.hpp"
+using std::map;
using std::string;
+using std::ostringstream;
using std::vector;
-namespace mesos { namespace internal {
+namespace tokenize {
vector<string> tokenize(const string& s, const string& delims)
{
@@ -30,4 +35,24 @@ vector<string> tokenize(const string& s,
}
}
-}} /* namespace mesos { namespace internal { */
+
+map<string, vector<string> > pairs(const string& s, const string& delim1, const string& delim2)
+{
+ map<string, vector<string> > result;
+
+ const vector<string>& tokens = tokenize(s, delim1);
+ foreach (const string& token, tokens) {
+ const vector<string>& pairs = tokenize(token, delim2);
+ if (pairs.size() != 2) {
+ ostringstream out;
+ out << "failed to split '" << token << "' with '" << delim2 << "'";
+ throw TokenizeException(out.str());
+ }
+
+ result[pairs[0]].push_back(pairs[1]);
+ }
+
+ return result;
+}
+
+} // namespace tokenize {
Modified: incubator/mesos/trunk/src/common/tokenize.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/tokenize.hpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/tokenize.hpp (original)
+++ incubator/mesos/trunk/src/common/tokenize.hpp Sun Jun 5 09:19:53 2011
@@ -1,18 +1,33 @@
#ifndef __TOKENIZE_HPP__
#define __TOKENIZE_HPP__
+#include <map>
+#include <stdexcept>
#include <string>
#include <vector>
-namespace mesos { namespace internal {
+namespace tokenize {
+
+class TokenizeException : public std::runtime_error
+{
+public:
+ TokenizeException(const std::string& what) : std::runtime_error(what) {}
+};
+
/**
* Utility function to tokenize a string based on some delimiters.
*/
-std::vector<std::string> tokenize(const std::string& s, const std::string& delims);
+std::vector<std::string> tokenize(const std::string& s,
+ const std::string& delims);
+
+
+std::map<std::string, std::vector<std::string> > pairs(const std::string& s,
+ const std::string& delim1,
+ const std::string& delim2);
-}} /* namespace mesos { namespace internal { */
+} // namespace tokenize {
#endif // __TOKENIZE_HPP__
Modified: incubator/mesos/trunk/src/common/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/common/zookeeper.cpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/common/zookeeper.cpp (original)
+++ incubator/mesos/trunk/src/common/zookeeper.cpp Sun Jun 5 09:19:53 2011
@@ -331,7 +331,7 @@ private:
{
ZooKeeperProcess* zooKeeperProcess = static_cast<ZooKeeperProcess*>(ctx);
process::dispatch(zooKeeperProcess->pid, &WatcherProcess::event,
- zooKeeperProcess->zk, type, state, path);
+ zooKeeperProcess->zk, type, state, string(path));
}
static void voidCompletion(int ret, const void *data)
@@ -355,8 +355,10 @@ private:
Promise<int> promise = (*args).get<0>();
string* result = (*args).get<1>();
- if (result != NULL && value != NULL) {
- result->assign(value);
+ if (ret == 0) {
+ if (result != NULL) {
+ result->assign(value);
+ }
}
promise.set(ret);
@@ -373,8 +375,10 @@ private:
Promise<int> promise = (*args).get<0>();
Stat *stat_result = (*args).get<1>();
- if (stat_result != NULL && stat != NULL) {
- *stat_result = *stat;
+ if (ret == 0) {
+ if (stat_result != NULL) {
+ *stat_result = *stat;
+ }
}
promise.set(ret);
@@ -392,12 +396,14 @@ private:
string* result = (*args).get<1>();
Stat* stat_result = (*args).get<2>();
- if (result != NULL && value != NULL && value_len > 0) {
- result->assign(value, value_len);
- }
+ if (ret == 0) {
+ if (result != NULL) {
+ result->assign(value, value_len);
+ }
- if (stat_result != NULL && stat != NULL) {
- *stat_result = *stat;
+ if (stat_result != NULL) {
+ *stat_result = *stat;
+ }
}
promise.set(ret);
@@ -414,9 +420,11 @@ private:
Promise<int> promise = (*args).get<0>();
vector<string>* results = (*args).get<1>();
- if (results != NULL && values != NULL) {
- for (int i = 0; i < values->count; i++) {
- results->push_back(values->data[i]);
+ if (ret == 0) {
+ if (results != NULL) {
+ for (int i = 0; i < values->count; i++) {
+ results->push_back(values->data[i]);
+ }
}
}
Modified: incubator/mesos/trunk/src/detector/detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/detector/detector.cpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/detector/detector.cpp (original)
+++ incubator/mesos/trunk/src/detector/detector.cpp Sun Jun 5 09:19:53 2011
@@ -10,16 +10,15 @@
#include "common/fatal.hpp"
#include "common/foreach.hpp"
+#ifdef WITH_ZOOKEEPER
+#include "common/zookeeper.hpp"
+#endif
#include "messaging/messages.hpp"
#include "detector.hpp"
#include "url_processor.hpp"
-#ifdef WITH_ZOOKEEPER
-#include "zookeeper.hpp"
-#endif
-
using namespace mesos;
using namespace mesos::internal;
@@ -71,8 +70,8 @@ public:
* for slaves and frameworks)
* @param quiet verbosity logging level for undelying ZooKeeper library
*/
- ZooKeeperMasterDetector(const std::string& servers,
- const std::string& znode,
+ ZooKeeperMasterDetector(const string& servers,
+ const string& znode,
const UPID& pid,
bool contend = false,
bool quiet = false);
@@ -82,19 +81,18 @@ public:
/**
* ZooKeeper watcher callback.
*/
- virtual void process(ZooKeeper *zk, int type, int state,
- const std::string &path);
+ virtual void process(ZooKeeper *zk, int type, int state, const string &path);
private:
/**
* @param s sequence id
*/
- void setId(const std::string &s);
+ void setId(const string &s);
/**
* @return current sequence id if contending to be a master
*/
- std::string getId();
+ string getId();
/**
* Attempts to detect a master.
@@ -105,10 +103,10 @@ private:
* @param seq sequence id of a master
* @return PID corresponding to a master
*/
- UPID lookupMasterPID(const std::string &seq) const;
+ UPID lookupMasterPID(const string &seq) const;
- std::string servers;
- std::string znode;
+ string servers;
+ string znode;
UPID pid;
bool contend;
bool reconnect;
@@ -116,9 +114,9 @@ private:
ZooKeeper *zk;
// Our sequence string if contending to be a master.
- std::string mySeq;
+ string mySeq;
- std::string currentMasterSeq;
+ string currentMasterSeq;
UPID currentMasterPID;
// Reconnect timer.
@@ -130,7 +128,7 @@ private:
MasterDetector::~MasterDetector() {}
-MasterDetector* MasterDetector::create(const std::string &url,
+MasterDetector* MasterDetector::create(const string &url,
const UPID &pid,
bool contend,
bool quiet)
@@ -154,15 +152,21 @@ MasterDetector* MasterDetector::create(c
// TODO(benh): Consider actually using the chroot feature of
// ZooKeeper, rather than just using it's syntax.
size_t index = urlPair.second.find("/");
- if (index == string::npos)
+ if (index == string::npos) {
fatal("expecting chroot path for ZooKeeper");
+ }
+
+ const string &servers = urlPair.second.substr(0, index);
+
const string &znode = urlPair.second.substr(index);
- if (znode == "/")
+ if (znode == "/") {
fatal("expecting chroot path for ZooKeeper ('/' is not supported)");
- const string &servers = urlPair.second.substr(0, index);
+ }
+
detector = new ZooKeeperMasterDetector(servers, znode, pid, contend, quiet);
#else
- fatal("ZooKeeper not supported in this build");
+ fatal("Cannot detect masters with 'zoo://', "
+ "ZooKeeper is not supported in this build");
#endif // #ifdef WITH_ZOOKEEPER
break;
}
@@ -435,8 +439,8 @@ void ZooKeeperMasterDetector::process(Zo
reconnect = true;
} else {
- LOG(INFO) << "Unimplemented watch event: (state is "
- << state << " and type is " << type << ")";
+ LOG(WARNING) << "Unimplemented watch event: (state is "
+ << state << " and type is " << type << ")";
}
}
Modified: incubator/mesos/trunk/src/local/local.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/local/local.cpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/local/local.cpp (original)
+++ incubator/mesos/trunk/src/local/local.cpp Sun Jun 5 09:19:53 2011
@@ -55,10 +55,12 @@ static MasterDetector* detector = NULL;
void registerOptions(Configurator* configurator)
{
- configurator->addOption<int>("slaves", 's', "Number of slaves", 1);
Logging::registerOptions(configurator);
Master::registerOptions(configurator);
Slave::registerOptions(configurator);
+ configurator->addOption<int>("num_slaves",
+ "Number of slaves to create for local cluster",
+ 1);
}
@@ -69,7 +71,8 @@ PID<Master> launch(int numSlaves,
bool quiet)
{
Configuration conf;
- conf.set("slaves", numSlaves);
+ conf.set("slaves", "*");
+ conf.set("num_slaves", numSlaves);
conf.set("quiet", quiet);
stringstream out;
@@ -83,16 +86,18 @@ PID<Master> launch(int numSlaves,
PID<Master> launch(const Configuration& conf,
bool initLogging)
{
- int numSlaves = conf.get<int>("slaves", 1);
+ int numSlaves = conf.get<int>("num_slaves", 1);
bool quiet = conf.get<bool>("quiet", false);
- if (master != NULL)
+ if (master != NULL) {
fatal("can only launch one local cluster at a time (for now)");
+ }
if (initLogging) {
pthread_once(&glog_initialized, initialize_glog);
- if (!quiet)
+ if (!quiet) {
google::SetStderrLogging(google::INFO);
+ }
}
master = new Master(conf);
Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Sun Jun 5 09:19:53 2011
@@ -4,7 +4,12 @@
#include <google/protobuf/descriptor.h>
+#include "config/config.hpp"
+
#include "common/date_utils.hpp"
+#ifdef WITH_ZOOKEEPER
+#include "common/zookeeper.hpp"
+#endif
#include "allocator.hpp"
#include "allocator_factory.hpp"
@@ -15,10 +20,13 @@ using namespace mesos;
using namespace mesos::internal;
using namespace mesos::internal::master;
+using boost::bad_lexical_cast;
using boost::lexical_cast;
using boost::unordered_map;
using boost::unordered_set;
+using process::HttpInternalServerErrorResponse;
+using process::HttpNotFoundResponse;
using process::HttpOKResponse;
using process::HttpResponse;
using process::HttpRequest;
@@ -28,6 +36,7 @@ using process::Promise;
using process::UPID;
using std::endl;
+using std::map;
using std::max;
using std::ostringstream;
using std::pair;
@@ -54,7 +63,7 @@ protected:
while (true) {
receive(1);
if (name() == process::TIMEOUT) {
- dispatch(master, &Master::timerTick);
+ process::dispatch(master, &Master::timerTick);
} else if (name() == process::EXITED) {
return;
}
@@ -118,13 +127,711 @@ private:
const PID<Master> master;
};
+} // namespace {
+
+
+#ifdef WITH_ZOOKEEPER
+
+// Forward declaration of watcher.
+class ZooKeeperSlavesManagerStorageWatcher;
+
+
+class ZooKeeperSlavesManagerStorage : public SlavesManagerStorage
+{
+public:
+ ZooKeeperSlavesManagerStorage(const string& _servers,
+ const string& _znode,
+ const PID<SlavesManager>& _slavesManager);
+
+ virtual ~ZooKeeperSlavesManagerStorage();
+
+ virtual Promise<bool> add(const string& hostname, uint16_t port);
+ virtual Promise<bool> remove(const string& hostname, uint16_t port);
+ virtual Promise<bool> activate(const string& hostname, uint16_t port);
+ virtual Promise<bool> deactivate(const string& hostname, uint16_t port);
+
+ Promise<bool> connected();
+ Promise<bool> reconnecting();
+ Promise<bool> reconnected();
+ Promise<bool> expired();
+ Promise<bool> updated(const string& path);
+
+private:
+ const string servers;
+ const string znode;
+ const PID<SlavesManager> slavesManager;
+ ZooKeeper* zk;
+ ZooKeeperSlavesManagerStorageWatcher* watcher;
+};
+
+
+class ZooKeeperSlavesManagerStorageWatcher : public Watcher
+{
+public:
+ ZooKeeperSlavesManagerStorageWatcher(const PID<ZooKeeperSlavesManagerStorage>& _pid)
+ : pid(_pid), reconnect(false) {}
+
+ virtual ~ZooKeeperSlavesManagerStorageWatcher() {}
+
+ virtual void process(ZooKeeper* zk, int type, int state, const string& path)
+ {
+ if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_SESSION_EVENT)) {
+ // Check if this is a reconnect.
+ if (!reconnect) {
+ // Initial connect.
+ process::dispatch(pid, &ZooKeeperSlavesManagerStorage::connected);
+ } else {
+ // Reconnected.
+ process::dispatch(pid, &ZooKeeperSlavesManagerStorage::reconnected);
+ }
+ } else if ((state == ZOO_EXPIRED_SESSION_STATE) && (type == ZOO_SESSION_EVENT)) {
+ // Session expiration. Let the manager take care of it.
+ process::dispatch(pid, &ZooKeeperSlavesManagerStorage::expired);
+
+ // If this watcher is reused, the next connect won't be a reconnect.
+ reconnect = false;
+ } else if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_CHANGED_EVENT)) {
+ // Let the manager deal with file changes.
+ process::dispatch(pid, &ZooKeeperSlavesManagerStorage::updated, path);
+ } else if ((state == ZOO_CONNECTING_STATE) && (type == ZOO_SESSION_EVENT)) {
+ // The client library automatically reconnects, taking into
+ // account failed servers in the connection string,
+ // appropriately handling the "herd effect", etc.
+ reconnect = true;
+ process::dispatch(pid, &ZooKeeperSlavesManagerStorage::reconnecting);
+ } else {
+ LOG(WARNING) << "Unimplemented watch event: (state is "
+ << state << " and type is " << type << ")";
+ }
+ }
+
+private:
+ const PID<ZooKeeperSlavesManagerStorage> pid;
+ bool reconnect;
+};
+
+
+ZooKeeperSlavesManagerStorage::ZooKeeperSlavesManagerStorage(const string& _servers,
+ const string& _znode,
+ const PID<SlavesManager>& _slavesManager)
+ : servers(_servers), znode(_znode), slavesManager(_slavesManager)
+{
+ PID<ZooKeeperSlavesManagerStorage> pid(*this);
+ watcher = new ZooKeeperSlavesManagerStorageWatcher(pid);
+ zk = new ZooKeeper(servers, 10000, watcher);
+}
+
+
+ZooKeeperSlavesManagerStorage::~ZooKeeperSlavesManagerStorage()
+{
+ delete zk;
+ delete watcher;
+}
+
+
+Promise<bool> ZooKeeperSlavesManagerStorage::add(const string& hostname, uint16_t port)
+{
+ int ret;
+ string result;
+ Stat stat;
+
+ ret = zk->get(znode + "/active", true, &result, &stat);
+
+ if (ret != ZOK) {
+ LOG(WARNING) << "Failed to get '" << znode + "/active"
+ << "' in ZooKeeper! (" << zk->error(ret) << ")";
+ return false;
+ }
+
+ ostringstream out;
+
+ if (result.size() == 0) {
+ out << hostname << ":" << port;
+ } else {
+ out << "," << hostname << ":" << port;
+ }
+
+ result += out.str();
+
+ // Set the data in the znode.
+ ret = zk->set(znode + "/active", result, stat.version);
+
+ if (ret != ZOK) {
+ LOG(WARNING) << "Could not add slave " << hostname << ":" << port
+ << " to '" << znode + "/active' in ZooKeeper! ("
+ << zk->error(ret) << ")";
+ return false;
+ }
+
+ return true;
+}
+
+
+Promise<bool> ZooKeeperSlavesManagerStorage::remove(const string& hostname, uint16_t port)
+{
+ string files[] = { "/active", "/inactive" };
+
+ foreach (const string& file, files) {
+ int ret;
+ string result;
+ Stat stat;
+
+ ret = zk->get(znode + file, true, &result, &stat);
+
+ if (ret != ZOK) {
+ LOG(WARNING) << "Failed to get '" << znode + file
+ << "' in ZooKeeper! (" << zk->error(ret) << ")";
+ return false;
+ }
+
+ ostringstream out;
+ out << hostname << ":" << port;
+
+ size_t index = result.find(out.str());
+
+ if (index != string::npos) {
+ if (index == 0) {
+ result.erase(index, out.str().size() + 1);
+ } else {
+ result.erase(index - 1, out.str().size() + 1);
+ }
+
+ // Set the data in the znode.
+ ret = zk->set(znode + file, result, stat.version);
+
+ if (ret != ZOK) {
+ LOG(WARNING) << "Could not remove slave " << hostname << ":" << port
+ << " to '" << znode + file << "' in ZooKeeper! ("
+ << zk->error(ret) << ")";
+ return false;
+ }
+ }
+ }
+
+ return true;
+}
+
+
+Promise<bool> ZooKeeperSlavesManagerStorage::activate(const string& hostname, uint16_t port)
+{
+ fatal("unimplemented");
+ return false;
+}
+
+
+Promise<bool> ZooKeeperSlavesManagerStorage::deactivate(const string& hostname, uint16_t port)
+{
+ fatal("unimplemented");
+ return false;
+}
+
+
+Promise<bool> ZooKeeperSlavesManagerStorage::connected()
+{
+ int ret;
+
+ static const string delimiter = "/";
+
+ // Assume the znode that was created does not end with a "/".
+ CHECK(znode.at(znode.length() - 1) != '/');
+
+ // Create directory path znodes as necessary.
+ size_t index = znode.find(delimiter, 0);
+
+ while (index < string::npos) {
+ // Get out the prefix to create.
+ index = znode.find(delimiter, index + 1);
+ string prefix = znode.substr(0, index);
+
+ // Create the node (even if it already exists).
+ ret = zk->create(prefix, "", ZOO_OPEN_ACL_UNSAFE, 0, NULL);
+
+ if (ret != ZOK && ret != ZNODEEXISTS) {
+ // Okay, consider this a failure (maybe we lost our connection
+ // to ZooKeeper), increment the failure count, log the issue,
+ // and perhaps try again when ZooKeeper issues get sorted out.
+ LOG(WARNING) << "Failed to create '" << znode
+ << "' in ZooKeeper! (" << zk->error(ret) << ")";
+ return false;
+ }
+ }
+
+ // Now make sure the 'active' znode is created.
+ ret = zk->create(znode + "/active", "", ZOO_OPEN_ACL_UNSAFE, 0, NULL);
+
+ if (ret != ZOK && ret != ZNODEEXISTS) {
+ LOG(WARNING) << "Failed to create '" << znode + "/active"
+ << "' in ZooKeeper! (" << zk->error(ret) << ")";
+ return false;
+ }
+
+ // Now make sure the 'inactive' znode is created.
+ ret = zk->create(znode + "/inactive", "", ZOO_OPEN_ACL_UNSAFE, 0, NULL);
+
+ if (ret != ZOK && ret != ZNODEEXISTS) {
+ LOG(WARNING) << "Failed to create '" << znode + "/inactive"
+ << "' in ZooKeeper! (" << zk->error(ret) << ")";
+ return false;
+ }
+
+ // Reconcile what's in the znodes versus what we have in memory
+ // (this also puts watches on these znodes).
+ updated(znode + "/active");
+ updated(znode + "/inactive");
+
+ return true;
+}
+
+
+Promise<bool> ZooKeeperSlavesManagerStorage::reconnecting()
+{
+ LOG(INFO) << "ZooKeeperSlavesManagerStorage is attempting to reconnect";
+ return true;
+}
+
+
+Promise<bool> ZooKeeperSlavesManagerStorage::reconnected()
+{
+ LOG(INFO) << "ZooKeeperSlavesManagerStorage has reconnected";
+
+ // Reconcile what's in the znodes versus what we have in memory
+ // (this also puts watches on these znodes).
+ updated(znode + "/active");
+ updated(znode + "/inactive");
+
+ return true;
+}
+
+
+Promise<bool> ZooKeeperSlavesManagerStorage::expired()
+{
+ LOG(WARNING) << "ZooKeeperSlavesManagerStorage session expired!";
+
+ CHECK(zk != NULL);
+ delete zk;
+
+ zk = new ZooKeeper(servers, 10000, watcher);
+
+ // TODO(benh): Put mechanisms in place such that reconnects may
+ // fail (or just take too long).
+
+ return true;
+}
+
+
+Promise<bool> ZooKeeperSlavesManagerStorage::updated(const string& path)
+{
+ LOG(INFO) << "Slave information at '" << path
+ << "' in ZooKeeper has been updated ... propogating changes";
+
+ int ret;
+ string result;
+
+ if (path == znode + "/active") {
+ ret = zk->get(znode + "/active", true, &result, NULL);
+
+ if (ret != ZOK) {
+ LOG(WARNING) << "Failed to get '" << znode + "/active"
+ << "' in ZooKeeper! (" << zk->error(ret) << ")";
+ return false;
+ }
+
+ // Parse what's in ZooKeeper into hostname port pairs.
+ map<string, set<uint16_t> > active;
+
+ const vector<string>& tokens = tokenize::tokenize(result, ",");
+ foreach (const string& token, tokens) {
+ const vector<string>& pairs = tokenize::tokenize(token, ":");
+ if (pairs.size() != 2) {
+ LOG(WARNING) << "Bad data in '" << znode + "/active"
+ << "', could not parse " << token;
+ return false;
+ }
+
+ try {
+ active[pairs[0]].insert(lexical_cast<uint16_t>(pairs[1]));
+ } catch (const bad_lexical_cast&) {
+ LOG(WARNING) << "Bad data in '" << znode + "/active"
+ << "', could not parse " << token;
+ return false;
+ }
+ }
+
+ process::dispatch(slavesManager, &SlavesManager::updateActive, active);
+ } else if (path == znode + "/inactive") {
+ ret = zk->get(znode + "/inactive", true, &result, NULL);
+
+ if (ret != ZOK) {
+ LOG(WARNING) << "Failed to get '" << znode + "/inactive"
+ << "' in ZooKeeper! (" << zk->error(ret) << ")";
+ return false;
+ }
+
+ // Parse what's in ZooKeeper into hostname port pairs.
+ map<string, set<uint16_t> > inactive;
+
+ const vector<string>& tokens = tokenize::tokenize(result, ",");
+ foreach (const string& token, tokens) {
+ const vector<string>& pairs = tokenize::tokenize(token, ":");
+ if (pairs.size() != 2) {
+ LOG(WARNING) << "Bad data in '" << znode + "/inactive"
+ << "', could not parse " << token;
+ return false;
+ }
+
+ try {
+ inactive[pairs[0]].insert(lexical_cast<uint16_t>(pairs[1]));
+ } catch (const bad_lexical_cast&) {
+ LOG(WARNING) << "Bad data in '" << znode + "/inactive"
+ << "', could not parse " << token;
+ return false;
+ }
+ }
+
+ process::dispatch(slavesManager, &SlavesManager::updateInactive, inactive);
+ } else {
+ LOG(WARNING) << "Not expecting changes to path '"
+ << path << "' in ZooKeeper";
+ return false;
+ }
+
+ return true;
+}
+
+#endif // WITH_ZOOKEEPER
+
+
+SlavesManager::SlavesManager(const Configuration& conf,
+ const PID<Master>& _master)
+ : process::Process<SlavesManager>("slaves"),
+ master(_master)
+{
+ // Create the slave manager storage based on configuration.
+ const string& slaves = conf.get<string>("slaves", "*");
+
+ // Check if 'slaves' starts with "zoo://".
+ string zoo = "zoo://";
+ size_t index = slaves.find(zoo);
+ if (index == 0) {
+#ifdef WITH_ZOOKEEPER
+ // TODO(benh): Consider actually using the chroot feature of
+ // ZooKeeper, rather than just using it's syntax.
+ string temp = slaves.substr(zoo.size());
+ index = temp.find("/");
+ if (index == string::npos) {
+ fatal("Expecting chroot path for ZooKeeper");
+ }
+
+ const string& servers = temp.substr(0, index);
+
+ const string& znode = temp.substr(index);
+ if (znode == "/") {
+ fatal("Expecting chroot path for ZooKeeper ('/' is not supported)");
+ }
+
+ storage = new ZooKeeperSlavesManagerStorage(servers, znode, self());
+ process::spawn(storage);
+#else
+ fatal("Cannot get active/inactive slave information using 'zoo://',"
+ " ZooKeeper is not supported in this build");
+#endif // WITH_ZOOKEEPER
+ } else {
+ // Parse 'slaves' as initial hostname:port pairs.
+ if (slaves != "*") {
+ const vector<string>& tokens = tokenize::tokenize(slaves, ",");
+ foreach (const string& token, tokens) {
+ const vector<string>& pairs = tokenize::tokenize(token, ":");
+ if (pairs.size() != 2) {
+ fatal("Failed to parse \"%s\" in option 'slaves'", token.c_str());
+ }
+
+ try {
+ active[pairs[0]].insert(lexical_cast<uint16_t>(pairs[1]));
+ } catch (const bad_lexical_cast&) {
+ fatal("Failed to parse \"%s\" in option 'slaves'", token.c_str());
+ }
+ }
+ }
+
+ storage = new SlavesManagerStorage();
+ process::spawn(storage);
+ }
+
+ // Set up our HTTP endpoints.
+ install("add", &SlavesManager::add);
+ install("remove", &SlavesManager::remove);
+}
+
+
+SlavesManager::~SlavesManager()
+{
+ // TODO(benh): Terminate and deallocate 'storage'.
+}
+
+
+void SlavesManager::registerOptions(Configurator* configurator)
+{
+ configurator->addOption<string>("slaves",
+ "Initial slaves that should be "
+ "considered part of this cluster "
+ "(or if using ZooKeeper a URL)", "*");
+}
+
+
+bool SlavesManager::add(const string& hostname, uint16_t port)
+{
+ // Make sure this slave is not currently deactivated or activated.
+ if (inactive.count(hostname) > 0 && inactive[hostname].count(port) > 0) {
+ LOG(WARNING) << "Attempted to add deactivated slave at "
+ << hostname << ":" << port;
+ return false;
+ } else if (active.count(hostname) == 0 || active[hostname].count(port) == 0) {
+ // Get the storage system to persist the addition.
+ bool result = process::call(storage->self(), &SlavesManagerStorage::add,
+ hostname, port);
+ if (result) {
+ LOG(INFO) << "Adding slave at " << hostname << ":" << port;
+ // Tell the master that this slave is now active (so it can
+ // allow the slave to register).
+ process::dispatch(master, &Master::activatedSlaveHostnamePort,
+ hostname, port);
+ active[hostname].insert(port);
+ return true;
+ }
+ }
+
+ return false;
+}
+
+
+bool SlavesManager::remove(const string& hostname, uint16_t port)
+{
+ // Make sure the slave is currently activated or deactivated.
+ if ((active.count(hostname) > 0 && active[hostname].count(port) > 0) ||
+ (inactive.count(hostname) > 0 && inactive[hostname].count(port) > 0)) {
+ // Get the storage system to persist the removal.
+ bool result = process::call(storage->self(), &SlavesManagerStorage::remove,
+ hostname, port);
+ if (result) {
+ LOG(INFO) << "Removing slave at " << hostname << ":" << port;
+ if (active.count(hostname) > 0 && active[hostname].count(port) > 0) {
+ process::dispatch(master, &Master::deactivatedSlaveHostnamePort,
+ hostname, port);
+ active[hostname].erase(port);
+ if (active[hostname].size() == 0) active.erase(hostname);
+ }
+
+ if (inactive.count(hostname) > 0 && inactive[hostname].count(port) > 0) {
+ inactive[hostname].erase(port);
+ if (inactive[hostname].size() == 0) inactive.erase(hostname);
+ }
+
+ return true;
+ }
+ }
+
+ return false;
+}
+
+
+bool SlavesManager::activate(const string& hostname, uint16_t port)
+{
+ // Make sure the slave is currently deactivated.
+ if (inactive.count(hostname) > 0 && inactive[hostname].count(port) > 0) {
+ // Get the storage system to persist the activation.
+ bool result = process::call(storage->self(), &SlavesManagerStorage::activate,
+ hostname, port);
+ if (result) {
+ LOG(INFO) << "Activating slave at " << hostname << ":" << port;
+ process::dispatch(master, &Master::deactivatedSlaveHostnamePort,
+ hostname, port);
+ active[hostname].insert(port);
+ inactive[hostname].erase(port);
+ if (inactive[hostname].size() == 0) inactive.erase(hostname);
+ return true;
+ }
+ }
+
+ return false;
+}
+
+
+bool SlavesManager::deactivate(const string& hostname, uint16_t port)
+{
+ // Make sure the slave is currently activated.
+ if (active.count(hostname) > 0 && active[hostname].count(port) > 0) {
+ // Get the storage system to persist the deactivation.
+ bool result = process::call(storage->self(), &SlavesManagerStorage::deactivate,
+ hostname, port);
+ if (result) {
+ LOG(INFO) << "Deactivating slave at " << hostname << ":" << port;
+ process::dispatch(master, &Master::deactivatedSlaveHostnamePort,
+ hostname, port);
+ active[hostname].erase(port);
+ if (active[hostname].size() == 0) active.erase(hostname);
+ inactive[hostname].insert(port);
+ return true;
+ }
+ }
+
+ return false;
+}
+
+
+void SlavesManager::updateActive(const map<string, set<uint16_t> >& _updated)
+{
+ // TODO(benh): Remove this unnecessary copy. The code below uses the
+ // [] operator to make it easier to read, but [] can't be used on
+ // something that is const, hence the copy. Ugh.
+ map<string, set<uint16_t> > updated = _updated;
+
+ // Loop through the current active slave hostname:port pairs and
+ // remove all that are not found in updated.
+ foreachpair (const string& hostname, _, active) {
+ if (updated.count(hostname) == 0) {
+ foreach (uint16_t port, active[hostname]) {
+ LOG(INFO) << "Removing slave at " << hostname << ":" << port;
+ process::dispatch(master, &Master::deactivatedSlaveHostnamePort,
+ hostname, port);
+ }
+ active.erase(hostname);
+ } else {
+ foreach (uint16_t port, active[hostname]) {
+ if (updated[hostname].count(port) == 0) {
+ LOG(INFO) << "Removing slave at " << hostname << ":" << port;
+ process::dispatch(master, &Master::deactivatedSlaveHostnamePort,
+ hostname, port);
+ active[hostname].erase(port);
+ if (active[hostname].size() == 0) active.erase(hostname);
+ }
+ }
+ }
+ }
+
+ // Now loop through the updated slave hostname:port pairs and add
+ // all that are not found in active.
+ foreachpair (const string& hostname, _, updated) {
+ if (active.count(hostname) == 0) {
+ foreach (uint16_t port, updated[hostname]) {
+ LOG(INFO) << "Adding slave at " << hostname << ":" << port;
+ process::dispatch(master, &Master::activatedSlaveHostnamePort,
+ hostname, port);
+ active[hostname].insert(port);
+ }
+ } else {
+ foreach (uint16_t port, active[hostname]) {
+ if (active[hostname].count(port) == 0) {
+ LOG(INFO) << "Adding slave at " << hostname << ":" << port;
+ process::dispatch(master, &Master::activatedSlaveHostnamePort,
+ hostname, port);
+ active[hostname].insert(port);
+ }
+ }
+ }
+ }
+}
+
+
+void SlavesManager::updateInactive(const map<string, set<uint16_t> >& updated)
+{
+ inactive = updated;
+}
+
+
+Promise<HttpResponse> SlavesManager::add(const HttpRequest& request)
+{
+ // Parse the query to get out the slave hostname and port.
+ string hostname = "";
+ uint16_t port = 0;
+
+ map<string, vector<string> > pairs = tokenize::pairs(request.query, ",", "=");
+
+ if (pairs.size() != 2) {
+ LOG(WARNING) << "Malformed query string when trying to add a slave";
+ return HttpNotFoundResponse();
+ }
+
+ // Make sure there is at least a 'hostname=' and 'port='.
+ if (pairs.count("hostname") == 0) {
+ LOG(WARNING) << "Missing 'hostname' in query string"
+ << " when trying to add a slave";
+ return HttpNotFoundResponse();
+ } else if (pairs.count("port") == 0) {
+ LOG(WARNING) << "Missing 'port' in query string"
+ << " when trying to add a slave";
+ return HttpNotFoundResponse();
+ }
+
+ hostname = pairs["hostname"].front();
+
+ // Check that 'port' is valid.
+ try {
+ port = lexical_cast<uint16_t>(pairs["port"].front());
+ } catch (const bad_lexical_cast&) {
+ LOG(WARNING) << "Failed to parse 'port = " << pairs["port"].front()
+ << "' when trying to add a slave";
+ return HttpNotFoundResponse();
+ }
+
+ LOG(INFO) << "Asked to add slave at " << hostname << ":" << port;
+
+ if (add(hostname, port)) {
+ return HttpOKResponse();
+ } else {
+ return HttpInternalServerErrorResponse();
+ }
+}
+
+
+Promise<HttpResponse> SlavesManager::remove(const HttpRequest& request)
+{
+ // Parse the query to get out the slave hostname and port.
+ string hostname = "";
+ uint16_t port = 0;
+
+ // TODO(benh): Don't use tokenize::pairs to get better errors?
+ map<string, vector<string> > pairs = tokenize::pairs(request.query, ",", "=");
+
+ if (pairs.size() != 2) {
+ LOG(WARNING) << "Malformed query string when trying to remove a slave";
+ return HttpNotFoundResponse();
+ }
+
+ // Make sure there is at least a 'hostname=' and 'port='.
+ if (pairs.count("hostname") == 0) {
+ LOG(WARNING) << "Missing 'hostname' in query string"
+ << " when trying to remove a slave";
+ return HttpNotFoundResponse();
+ } else if (pairs.count("port") == 0) {
+ LOG(WARNING) << "Missing 'port' in query string"
+ << " when trying to remove a slave";
+ return HttpNotFoundResponse();
+ }
+
+ hostname = pairs["hostname"].front();
+
+ // Check that 'port' is valid.
+ try {
+ port = lexical_cast<uint16_t>(pairs["port"].front());
+ } catch (const bad_lexical_cast&) {
+ LOG(WARNING) << "Failed to parse 'port = " << pairs["port"].front()
+ << "' when trying to remove a slave";
+ return HttpNotFoundResponse();
+ }
+
+ LOG(INFO) << "Asked to remove slave at " << hostname << ":" << port;
+
+ if (remove(hostname, port)) {
+ return HttpOKResponse();
+ } else {
+ return HttpInternalServerErrorResponse();
+ }
}
Master::Master()
: MesosProcess<Master>("master"),
- active(false),
- nextFrameworkId(0), nextSlaveId(0), nextOfferId(0)
+ active(false), nextFrameworkId(0), nextSlaveId(0), nextOfferId(0)
{
allocatorType = "simple";
initialize();
@@ -133,8 +840,8 @@ Master::Master()
Master::Master(const Configuration& _conf)
: MesosProcess<Master>("master"),
- active(false),
- conf(_conf), nextFrameworkId(0), nextSlaveId(0), nextOfferId(0)
+ active(false), conf(_conf), nextFrameworkId(0), nextSlaveId(0),
+ nextOfferId(0)
{
allocatorType = conf.get("allocator", "simple");
initialize();
@@ -145,26 +852,25 @@ Master::~Master()
{
LOG(INFO) << "Shutting down master";
- delete allocator;
-
- foreachpair (_, Framework *framework, frameworks) {
- foreachpair(_, Task *task, framework->tasks)
- delete task;
- delete framework;
+ foreachpaircopy (_, Framework* framework, frameworks) {
+ removeFramework(framework);
}
- foreachpair (_, Slave *slave, slaves) {
- delete slave;
+ foreachpaircopy (_, Slave* slave, slaves) {
+ removeSlave(slave);
}
- foreachpair (_, SlotOffer *offer, slotOffers) {
- delete offer;
- }
+ delete allocator;
+
+ CHECK(slotOffers.size() == 0);
+
+ // TODO(benh): Terminate and delete slave manager!
}
void Master::registerOptions(Configurator* configurator)
{
+ SlavesManager::registerOptions(configurator);
configurator->addOption<string>("allocator", 'a', "Allocation module name",
"simple");
configurator->addOption<bool>("root_submissions",
@@ -332,11 +1038,16 @@ void Master::operator () ()
masterId = DateUtils::currentDate() + "-" + msg.token();
LOG(INFO) << "Master ID: " << masterId;
+ // Setup slave manager.
+ slavesManager = new SlavesManager(conf, self());
+ process::spawn(slavesManager);
+
// Create the allocator (we do this after the constructor because it
// leaks 'this').
allocator = createAllocator();
- if (!allocator)
+ if (!allocator) {
LOG(FATAL) << "Unrecognized allocator type: " << allocatorType;
+ }
link(spawn(new AllocatorTimer(self())));
//link(spawn(new SharesPrinter(self())));
@@ -425,9 +1136,6 @@ void Master::initialize()
&ExitedExecutorMessage::executor_id,
&ExitedExecutorMessage::result);
- install(SH2M_HEARTBEAT, &Master::slaveHeartbeat,
- &HeartbeatMessage::slave_id);
-
install(process::EXITED, &Master::exited);
// Install HTTP request handlers.
@@ -738,23 +1446,10 @@ void Master::statusUpdateAck(const Frame
void Master::registerSlave(const SlaveInfo& slaveInfo)
{
- Slave* slave = new Slave(slaveInfo, newSlaveId(), from(), elapsed());
-
- LOG(INFO) << "Registering slave " << slave->slaveId
- << " at " << slave->pid;
-
- slaves[slave->slaveId] = slave;
- pidToSlaveId[slave->pid] = slave->slaveId;
- link(slave->pid);
-
- allocator->slaveAdded(slave);
-
- MSG<M2S_REGISTER_REPLY> out;
- out.mutable_slave_id()->MergeFrom(slave->slaveId);
- out.set_heartbeat_interval(HEARTBEAT_INTERVAL);
- send(slave->pid, out);
+ addSlave(slaveInfo, newSlaveId(), from());
}
+
void Master::reregisterSlave(const SlaveID& slaveId,
const SlaveInfo& slaveInfo,
const vector<Task>& tasks)
@@ -772,43 +1467,35 @@ void Master::reregisterSlave(const Slave
LOG(ERROR) << "Slave re-registered with in use SlaveID!";
send(from(), process::TERMINATE);
} else {
- Slave* slave = new Slave(slaveInfo, slaveId, from(), elapsed());
-
- slaves[slave->slaveId] = slave;
- pidToSlaveId[slave->pid] = slave->slaveId;
- link(slave->pid);
-
- MSG<M2S_REREGISTER_REPLY> out;
- out.mutable_slave_id()->MergeFrom(slave->slaveId);
- out.set_heartbeat_interval(HEARTBEAT_INTERVAL);
- send(slave->pid, out);
-
- for (int i = 0; i < tasks.size(); i++) {
- Task* task = new Task(tasks[i]);
-
- // Add the task to the slave.
- slave->addTask(task);
-
- // Try and add the task to the framework too, but since the
- // framework might not yet be connected we won't be able to
- // add them. However, when the framework connects later we
- // will add them then. We also tell this slave the current
- // framework pid for this task. Again, we do the same thing if
- // a framework currently isn't registered.
- Framework* framework = lookupFramework(task->framework_id());
- if (framework != NULL) {
- framework->addTask(task);
- MSG<M2S_UPDATE_FRAMEWORK> out;
- out.mutable_framework_id()->MergeFrom(framework->frameworkId);
- out.set_pid(framework->pid);
- send(slave->pid, out);
- } else {
- // TODO(benh): We should really put a timeout on how long we
- // keep tasks running on a slave that never have frameworks
- // reregister and claim them.
- LOG(WARNING) << "Possibly orphaned task " << task->task_id()
- << " of framework " << task->framework_id()
- << " running on slave " << slaveId;
+ Slave* slave = addSlave(slaveInfo, slaveId, from());
+ if (slave != NULL) {
+ for (int i = 0; i < tasks.size(); i++) {
+ Task* task = new Task(tasks[i]);
+
+ // Add the task to the slave.
+ slave->addTask(task);
+
+ // Try and add the task to the framework too, but since the
+ // framework might not yet be connected we won't be able to
+ // add them. However, when the framework connects later we
+ // will add them then. We also tell this slave the current
+ // framework pid for this task. Again, we do the same thing
+ // if a framework currently isn't registered.
+ Framework* framework = lookupFramework(task->framework_id());
+ if (framework != NULL) {
+ framework->addTask(task);
+ MSG<M2S_UPDATE_FRAMEWORK> out;
+ out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+ out.set_pid(framework->pid);
+ send(slave->pid, out);
+ } else {
+ // TODO(benh): We should really put a timeout on how long we
+ // keep tasks running on a slave that never have frameworks
+ // reregister and claim them.
+ LOG(WARNING) << "Possibly orphaned task " << task->task_id()
+ << " of framework " << task->framework_id()
+ << " running on slave " << slaveId;
+ }
}
}
}
@@ -941,28 +1628,36 @@ void Master::exitedExecutor(const SlaveI
}
-void Master::slaveHeartbeat(const SlaveID& slaveId)
+void Master::activatedSlaveHostnamePort(const string& hostname, uint16_t port)
{
- Slave* slave = lookupSlave(slaveId);
- if (slave != NULL) {
- slave->lastHeartbeat = elapsed();
- } else {
- LOG(WARNING) << "Received heartbeat for UNKNOWN slave "
- << slaveId << " from " << from();
- }
+ LOG(INFO) << "Master now considering a slave at "
+ << hostname << ":" << port << " as active";
+ slaveHostnamePorts[hostname].insert(port);
}
-void Master::timerTick()
+void Master::deactivatedSlaveHostnamePort(const string& hostname, uint16_t port)
{
- foreachpaircopy (_, Slave* slave, slaves) {
- if (slave->lastHeartbeat + HEARTBEAT_TIMEOUT <= elapsed()) {
- LOG(INFO) << "Slave " << slave->slaveId
- << " missing heartbeats ... considering disconnected";
- removeSlave(slave);
+ if (slaveHostnamePorts.count(hostname) > 0 &&
+ slaveHostnamePorts[hostname].count(port) > 0) {
+ // Look for a connected slave and remove it.
+ foreachpair (_, Slave* slave, slaves) {
+ if (slave->info.hostname() == hostname && slave->pid.port == port) {
+ LOG(WARNING) << "Removing slave " << slave->slaveId
+ << " because it has been deactivated";
+ removeSlave(slave);
+ break;
+ }
}
+
+ slaveHostnamePorts[hostname].erase(port);
+ if (slaveHostnamePorts[hostname].size() == 0) slaveHostnamePorts.erase(hostname);
}
+}
+
+void Master::timerTick()
+{
// Check which framework filters can be expired.
foreachpair (_, Framework* framework, frameworks) {
framework->removeExpiredFilters(elapsed());
@@ -1053,604 +1748,6 @@ Promise<HttpResponse> Master::vars(const
}
-// while (true) {
-// switch (receive()) {
-
-// case NEW_MASTER_DETECTED: {
-// const MSG<NEW_MASTER_DETECTED>& msg = message();
-
-// // Check and see if we are (1) still waiting to be the active
-// // master, (2) newly active master, (3) no longer active master,
-// // or (4) still active master.
-// PID pid(msg.pid());
-
-// if (pid != self() && !active) {
-// LOG(INFO) << "Waiting to be master!";
-// } else if (pid == self() && !active) {
-// LOG(INFO) << "Acting as master!";
-// active = true;
-// } else if (pid != self() && active) {
-// LOG(FATAL) << "No longer active master ... committing suicide!";
-// } else if (pid == self() && active) {
-// LOG(INFO) << "Still acting as master!";
-// }
-// break;
-// }
-
-// case NO_MASTER_DETECTED: {
-// if (active) {
-// LOG(FATAL) << "No longer active master ... committing suicide!";
-// } else {
-// LOG(FATAL) << "No master detected (?) ... committing suicide!";
-// }
-// break;
-// }
-
-// case MASTER_DETECTION_FAILURE: {
-// LOG(FATAL) << "Cannot reliably detect master ... committing suicide!";
-// break;
-// }
-
-// case F2M_REGISTER_FRAMEWORK: {
-// const MSG<F2M_REGISTER_FRAMEWORK>& msg = message();
-
-// Framework *framework =
-// new Framework(msg.framework(), newFrameworkId(), from(), elapsed());
-
-// LOG(INFO) << "Registering " << framework << " at " << framework->pid;
-
-// if (framework->info.executor().uri() == "") {
-// LOG(INFO) << framework << " registering without an executor URI";
-// MSG<M2F_ERROR> out;
-// out.set_code(1);
-// out.set_message("No executor URI given");
-// send(from(), out);
-// delete framework;
-// break;
-// }
-
-// bool rootSubmissions = conf.get<bool>("root_submissions", true);
-// if (framework->info.user() == "root" && rootSubmissions == false) {
-// LOG(INFO) << framework << " registering as root, but "
-// << "root submissions are disabled on this cluster";
-// MSG<M2F_ERROR> out;
-// out.set_code(1);
-// out.set_message("User 'root' is not allowed to run frameworks");
-// send(from(), out);
-// delete framework;
-// break;
-// }
-
-// addFramework(framework);
-// break;
-// }
-
-// case F2M_REREGISTER_FRAMEWORK: {
-// const MSG<F2M_REREGISTER_FRAMEWORK> &msg = message();
-
-// if (msg.framework_id() == "") {
-// LOG(ERROR) << "Framework re-registering without an id!";
-// MSG<M2F_ERROR> out;
-// out.set_code(1);
-// out.set_message("Missing framework id");
-// send(from(), out);
-// break;
-// }
-
-// if (msg.framework().executor().uri() == "") {
-// LOG(INFO) << "Framework " << msg.framework_id() << " re-registering "
-// << "without an executor URI";
-// MSG<M2F_ERROR> out;
-// out.set_code(1);
-// out.set_message("No executor URI given");
-// send(from(), out);
-// break;
-// }
-
-// LOG(INFO) << "Re-registering framework " << msg.framework_id()
-// << " at " << from();
-
-// if (frameworks.count(msg.framework_id()) > 0) {
-// // Using the "generation" of the scheduler allows us to keep a
-// // scheduler that got partitioned but didn't die (in ZooKeeper
-// // speak this means didn't lose their session) and then
-// // eventually tried to connect to this master even though
-// // another instance of their scheduler has reconnected. This
-// // might not be an issue in the future when the
-// // master/allocator launches the scheduler can get restarted
-// // (if necessary) by the master and the master will always
-// // know which scheduler is the correct one.
-// if (msg.generation() == 0) {
-// LOG(INFO) << "Framework " << msg.framework_id() << " failed over";
-// failoverFramework(frameworks[msg.framework_id()], from());
-// // TODO: Should we check whether the new scheduler has given
-// // us a different framework name, user name or executor info?
-// } else {
-// LOG(INFO) << "Framework " << msg.framework_id()
-// << " re-registering with an already used id "
-// << " and not failing over!";
-// MSG<M2F_ERROR> out;
-// out.set_code(1);
-// out.set_message("Framework id in use");
-// send(from(), out);
-// break;
-// }
-// } else {
-// // We don't have a framework with this ID, so we must be a newly
-// // elected Mesos master to which either an existing scheduler or a
-// // failed-over one is connecting. Create a Framework object and add
-// // any tasks it has that have been reported by reconnecting slaves.
-// Framework *framework =
-// new Framework(msg.framework(), msg.framework_id(), from(), elapsed());
-
-// // TODO(benh): Check for root submissions like above!
-
-// addFramework(framework);
-// // Add any running tasks reported by slaves for this framework.
-// foreachpair (const SlaveID& slaveId, Slave *slave, slaves) {
-// foreachpair (_, Task *task, slave->tasks) {
-// if (framework->frameworkId == task->framework_id()) {
-// framework->addTask(task);
-// }
-// }
-// }
-// }
-
-// CHECK(frameworks.count(msg.framework_id()) > 0);
-
-// // Broadcast the new framework pid to all the slaves. We have to
-// // broadcast because an executor might be running on a slave but
-// // it currently isn't running any tasks. This could be a
-// // potential scalability issue ...
-// foreachpair (_, Slave *slave, slaves) {
-// MSG<M2S_UPDATE_FRAMEWORK> out;
-// out.mutable_framework_id()->MergeFrom(msg.framework_id());
-// out.set_pid(from());
-// send(slave->pid, out);
-// }
-// break;
-// }
-
-// case F2M_UNREGISTER_FRAMEWORK: {
-// const MSG<F2M_UNREGISTER_FRAMEWORK>& msg = message();
-
-// LOG(INFO) << "Asked to unregister framework " << msg.framework_id();
-
-// Framework *framework = lookupFramework(msg.framework_id());
-// if (framework != NULL) {
-// if (framework->pid == from())
-// removeFramework(framework);
-// else
-// LOG(WARNING) << from() << " tried to unregister framework; "
-// << "expecting " << framework->pid;
-// }
-// break;
-// }
-
-// case F2M_RESOURCE_OFFER_REPLY: {
-// const MSG<F2M_RESOURCE_OFFER_REPLY>& msg = message();
-
-// Framework *framework = lookupFramework(msg.framework_id());
-// if (framework != NULL) {
-
-// // Copy out the task descriptions (could optimize).
-// vector<TaskDescription> tasks;
-// for (int i = 0; i < msg.task_size(); i++) {
-// tasks.push_back(msg.task(i));
-// }
-
-// SlotOffer *offer = lookupSlotOffer(msg.offer_id());
-// if (offer != NULL) {
-// processOfferReply(offer, tasks, msg.params());
-// } else {
-// // The slot offer is gone, meaning that we rescinded it, it
-// // has already been replied to, or that the slave was lost;
-// // immediately report any tasks in it as lost (it would
-// // probably be better to have better error messages here).
-// foreach (const TaskDescription &task, tasks) {
-// MSG<M2F_STATUS_UPDATE> out;
-// out.mutable_framework_id()->MergeFrom(msg.framework_id());
-// TaskStatus *status = out.mutable_status();
-// status->mutable_task_id()->MergeFrom(task.task_id());
-// status->mutable_slave_id()->MergeFrom(task.slave_id());
-// status->set_state(TASK_LOST);
-// send(framework->pid, out);
-// }
-// }
-// }
-// break;
-// }
-
-// case F2M_REVIVE_OFFERS: {
-// const MSG<F2M_REVIVE_OFFERS>& msg = message();
-
-// Framework *framework = lookupFramework(msg.framework_id());
-// if (framework != NULL) {
-// LOG(INFO) << "Reviving offers for " << framework;
-// framework->slaveFilter.clear();
-// allocator->offersRevived(framework);
-// }
-// break;
-// }
-
-// case F2M_KILL_TASK: {
-// const MSG<F2M_KILL_TASK>& msg = message();
-
-// LOG(INFO) << "Asked to kill task " << msg.task_id()
-// << " of framework " << msg.framework_id();
-
-// Framework *framework = lookupFramework(msg.framework_id());
-// if (framework != NULL) {
-// Task *task = framework->lookupTask(msg.task_id());
-// if (task != NULL) {
-// killTask(task);
-// } else {
-// LOG(ERROR) << "Cannot kill task " << msg.task_id()
-// << " of framework " << msg.framework_id()
-// << " because it cannot be found";
-// MSG<M2F_STATUS_UPDATE> out;
-// out.mutable_framework_id()->MergeFrom(task->framework_id());
-// TaskStatus *status = out.mutable_status();
-// status->mutable_task_id()->MergeFrom(task->task_id());
-// status->mutable_slave_id()->MergeFrom(task->slave_id());
-// status->set_state(TASK_LOST);
-// send(framework->pid, out);
-// }
-// }
-// break;
-// }
-
-// case F2M_FRAMEWORK_MESSAGE: {
-// const MSG<F2M_FRAMEWORK_MESSAGE>& msg = message();
-
-// Framework *framework = lookupFramework(msg.framework_id());
-// if (framework != NULL) {
-// Slave *slave = lookupSlave(msg.message().slave_id());
-// if (slave != NULL) {
-// MSG<M2S_FRAMEWORK_MESSAGE> out;
-// out.mutable_framework_id()->MergeFrom(msg.framework_id());
-// out.mutable_message()->MergeFrom(msg.message());
-// send(slave->pid, out);
-// }
-// }
-// break;
-// }
-
-// case F2M_STATUS_UPDATE_ACK: {
-// const MSG<F2M_STATUS_UPDATE_ACK>& msg = message();
-
-// Framework *framework = lookupFramework(msg.framework_id());
-// if (framework != NULL) {
-// Slave *slave = lookupSlave(msg.slave_id());
-// if (slave != NULL) {
-// MSG<M2S_STATUS_UPDATE_ACK> out;
-// out.mutable_framework_id()->MergeFrom(msg.framework_id());
-// out.mutable_slave_id()->MergeFrom(msg.slave_id());
-// out.mutable_task_id()->MergeFrom(msg.task_id());
-// send(slave->pid, out);
-// }
-// }
-// break;
-// }
-
-// case S2M_REGISTER_SLAVE: {
-// const MSG<S2M_REGISTER_SLAVE>& msg = message();
-
-// Slave* slave = new Slave(msg.slave(), newSlaveId(), from(), elapsed());
-
-// LOG(INFO) << "Registering slave " << slave->slaveId
-// << " at " << slave->pid;
-
-// slaves[slave->slaveId] = slave;
-// pidToSlaveId[slave->pid] = slave->slaveId;
-// link(slave->pid);
-
-// allocator->slaveAdded(slave);
-
-// MSG<M2S_REGISTER_REPLY> out;
-// out.mutable_slave_id()->MergeFrom(slave->slaveId);
-// out.set_heartbeat_interval(HEARTBEAT_INTERVAL);
-// send(slave->pid, out);
-// break;
-// }
-
-// case S2M_REREGISTER_SLAVE: {
-// const MSG<S2M_REREGISTER_SLAVE>& msg = message();
-
-// LOG(INFO) << "Re-registering " << msg.slave_id() << " at " << from();
-
-// if (msg.slave_id() == "") {
-// LOG(ERROR) << "Slave re-registered without a SlaveID!";
-// send(from(), TERMINATE);
-// break;
-// }
-
-// // TODO(benh): Once we support handling session expiration, we
-// // will want to handle having a slave re-register with us when
-// // we already have them recorded (i.e., the below if statement
-// // will evaluate to true).
-
-// if (lookupSlave(msg.slave_id()) != NULL) {
-// LOG(ERROR) << "Slave re-registered with in use SlaveID!";
-// send(from(), TERMINATE);
-// break;
-// }
-
-// Slave* slave = new Slave(msg.slave(), msg.slave_id(), from(), elapsed());
-
-// slaves[slave->slaveId] = slave;
-// pidToSlaveId[slave->pid] = slave->slaveId;
-// link(slave->pid);
-
-// MSG<M2S_REREGISTER_REPLY> out;
-// out.mutable_slave_id()->MergeFrom(slave->slaveId);
-// out.set_heartbeat_interval(HEARTBEAT_INTERVAL);
-// send(slave->pid, out);
-
-// for (int i = 0; i < msg.task_size(); i++) {
-// Task *task = new Task(msg.task(i));
-// slave->addTask(task);
-
-// // Tell this slave the current framework pid for this task.
-// Framework *framework = lookupFramework(task->framework_id());
-// if (framework != NULL) {
-// framework->addTask(task);
-// MSG<M2S_UPDATE_FRAMEWORK> out;
-// out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-// out.set_pid(framework->pid);
-// send(slave->pid, out);
-// }
-// }
-
-// // TODO(benh|alig): We should put a timeout on how long we keep
-// // tasks running that never have frameworks reregister that
-// // claim them.a
-// break;
-// }
-
-// case S2M_UNREGISTER_SLAVE: {
-// const MSG<S2M_UNREGISTER_SLAVE>& msg = message();
-
-// LOG(INFO) << "Asked to unregister slave " << msg.slave_id();
-
-// // TODO(benh): Check that only the slave is asking to unregister?
-
-// Slave *slave = lookupSlave(msg.slave_id());
-// if (slave != NULL)
-// removeSlave(slave);
-// break;
-// }
-
-// case S2M_STATUS_UPDATE: {
-// const MSG<S2M_STATUS_UPDATE>& msg = message();
-
-// const TaskStatus& status = msg.status();
-
-// LOG(INFO) << "Status update: task " << status.task_id()
-// << " of framework " << msg.framework_id()
-// << " is now in state "
-// << TaskState_descriptor()->FindValueByNumber(status.state())->name();
-
-// Slave* slave = lookupSlave(status.slave_id());
-// if (slave != NULL) {
-// Framework* framework = lookupFramework(msg.framework_id());
-// if (framework != NULL) {
-// // Pass on the (transformed) status update to the framework.
-// MSG<M2F_STATUS_UPDATE> out;
-// out.mutable_framework_id()->MergeFrom(msg.framework_id());
-// out.mutable_status()->MergeFrom(status);
-// send(framework->pid, out);
-
-// // Lookup the task and see if we need to update anything locally.
-// Task *task = slave->lookupTask(msg.framework_id(), status.task_id());
-// if (task != NULL) {
-// task->set_state(status.state());
-// // Remove the task if necessary.
-// if (status.state() == TASK_FINISHED ||
-// status.state() == TASK_FAILED ||
-// status.state() == TASK_KILLED ||
-// status.state() == TASK_LOST) {
-// removeTask(task, TRR_TASK_ENDED);
-// }
-// } else {
-// LOG(WARNING) << "Status update error: couldn't lookup "
-// << "task " << status.task_id();
-// }
-// } else {
-// LOG(WARNING) << "Status update error: couldn't lookup "
-// << "framework " << msg.framework_id();
-// }
-// } else {
-// LOG(WARNING) << "Status update error: couldn't lookup slave "
-// << status.slave_id();
-// }
-// break;
-// }
-
-// case S2M_FRAMEWORK_MESSAGE: {
-// const MSG<S2M_FRAMEWORK_MESSAGE>& msg = message();
-
-// Slave *slave = lookupSlave(msg.message().slave_id());
-// if (slave != NULL) {
-// Framework *framework = lookupFramework(msg.framework_id());
-// if (framework != NULL) {
-// MSG<M2S_FRAMEWORK_MESSAGE> out;
-// out.mutable_framework_id()->MergeFrom(msg.framework_id());
-// out.mutable_message()->MergeFrom(msg.message());
-// send(framework->pid, out);
-// }
-// }
-// break;
-// }
-
-// case S2M_EXITED_EXECUTOR: {
-// const MSG<S2M_EXITED_EXECUTOR>&msg = message();
-
-// Slave *slave = lookupSlave(msg.slave_id());
-// if (slave != NULL) {
-// Framework *framework = lookupFramework(msg.framework_id());
-// if (framework != NULL) {
-// LOG(INFO) << "Executor " << msg.executor_id()
-// << " of framework " << framework->frameworkId
-// << " on slave " << slave->slaveId
-// << " (" << slave->info.hostname() << ") "
-// << "exited with status " << msg.status();
-
-// // Tell the framework which tasks have been lost.
-// foreachpaircopy (_, Task* task, framework->tasks) {
-// if (task->slave_id() == slave->slaveId &&
-// task->executor_id() == msg.executor_id()) {
-// MSG<M2F_STATUS_UPDATE> out;
-// out.mutable_framework_id()->MergeFrom(task->framework_id());
-// TaskStatus *status = out.mutable_status();
-// status->mutable_task_id()->MergeFrom(task->task_id());
-// status->mutable_slave_id()->MergeFrom(task->slave_id());
-// status->set_state(TASK_LOST);
-// send(framework->pid, out);
-
-// LOG(INFO) << "Removing " << task << " because of lost executor";
-
-// removeTask(task, TRR_EXECUTOR_LOST);
-// }
-// }
-
-// // TODO(benh): Send the framework it's executor's exit
-// // status? Or maybe at least have something like
-// // M2F_EXECUTOR_LOST?
-// }
-// }
-// break;
-// }
-
-// case SH2M_HEARTBEAT: {
-// const MSG<SH2M_HEARTBEAT>& msg = message();
-
-// Slave *slave = lookupSlave(msg.slave_id());
-// if (slave != NULL) {
-// slave->lastHeartbeat = elapsed();
-// } else {
-// LOG(WARNING) << "Received heartbeat for UNKNOWN slave "
-// << msg.slave_id() << " from " << from();
-// }
-// break;
-// }
-
-// case M2M_TIMER_TICK: {
-// foreachpaircopy (_, Slave *slave, slaves) {
-// if (slave->lastHeartbeat + HEARTBEAT_TIMEOUT <= elapsed()) {
-// LOG(INFO) << slave
-// << " missing heartbeats ... considering disconnected";
-// removeSlave(slave);
-// }
-// }
-
-// // Check which framework filters can be expired.
-// foreachpair (_, Framework *framework, frameworks)
-// framework->removeExpiredFilters(elapsed());
-
-// // Do allocations!
-// allocator->timerTick();
-// break;
-// }
-
-// case M2M_FRAMEWORK_EXPIRED: {
-// const MSG<M2M_FRAMEWORK_EXPIRED>&msg = message();
-
-// Framework* framework = lookupFramework(msg.framework_id());
-// if (framework != NULL) {
-// LOG(INFO) << "Framework failover timer expired, removing "
-// << framework;
-// removeFramework(framework);
-// }
-// break;
-// }
-
-// case PROCESS_EXIT: {
-// // TODO(benh): Could we get PROCESS_EXIT from a network partition?
-// LOG(INFO) << "Process exited: " << from();
-// if (pidToFrameworkId.count(from()) > 0) {
-// const FrameworkID& frameworkId = pidToFrameworkId[from()];
-// Framework* framework = lookupFramework(frameworkId);
-// if (framework != NULL) {
-// LOG(INFO) << framework << " disconnected";
-
-// // Stop sending offers here for now.
-// framework->active = false;
-
-// // Remove the framework's slot offers.
-// foreachcopy (SlotOffer* offer, framework->slotOffers) {
-// removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
-// }
-
-// framework->failoverTimer = new FrameworkFailoverTimer(self(), frameworkId);
-// link(spawn(framework->failoverTimer));
-// // removeFramework(framework);
-// }
-// } else if (pidToSlaveId.count(from()) > 0) {
-// const SlaveID& slaveId = pidToSlaveId[from()];
-// Slave* slave = lookupSlave(slaveId);
-// if (slave != NULL) {
-// LOG(INFO) << slave << " disconnected";
-// removeSlave(slave);
-// }
-// } else {
-// foreachpair (_, Framework *framework, frameworks) {
-// if (framework->failoverTimer != NULL &&
-// framework->failoverTimer->self() == from()) {
-// LOG(ERROR) << "Bad framework failover timer, removing "
-// << framework;
-// removeFramework(framework);
-// break;
-// }
-// }
-// }
-// break;
-// }
-
-// case M2M_GET_STATE: {
-// state::MasterState *state = getState();
-// MSG<M2M_GET_STATE_REPLY> out;
-// out.set_pointer((char *) &state, sizeof(state));
-// send(from(), out);
-// break;
-// }
-
-// case M2M_SHUTDOWN: {
-// LOG(INFO) << "Asked to shut down by " << from();
-// foreachpair (_, Slave *slave, slaves)
-// send(slave->pid, TERMINATE);
-// return;
-// }
-
-// case vars: {
-// LOG(INFO) << "HTTP request for 'vars'";
-
-// ostringstream out;
-
-// out <<
-// "build_date " << build::DATE << "\n" <<
-// "build_user " << build::USER << "\n" <<
-// "build_flags " << build::FLAGS << "\n" <<
-// "frameworks_count " << frameworks.size() << "\n";
-
-// // Also add the configuration values.
-// foreachpair (const string& key, const string& value, conf.getMap()) {
-// out << key << " " << value << "\n";
-// }
-
-// Process::send(from(), "response", out.str().data(), out.str().size());
-// break;
-// }
-
-// default:
-// LOG(ERROR) << "Received unknown message (" << msgid()
-// << ") from " << from();
-// break;
-// }
-// }
-// }
-
-
OfferID Master::makeOffer(Framework* framework,
const vector<SlaveResources>& resources)
{
@@ -1845,7 +1942,7 @@ void Master::launchTask(Framework* frame
// TODO: Make the error codes and messages programmer-friendly
void Master::terminateFramework(Framework* framework,
int32_t code,
- const std::string& message)
+ const string& message)
{
LOG(INFO) << "Terminating " << framework << " due to error: " << message;
@@ -1981,13 +2078,60 @@ void Master::removeFramework(Framework*
// TODO(benh): unlink(framework->pid);
pidToFrameworkId.erase(framework->pid);
- // Delete it
+ // Delete it.
frameworks.erase(framework->frameworkId);
allocator->frameworkRemoved(framework);
delete framework;
}
+Slave* Master::addSlave(const SlaveInfo& slaveInfo,
+ const SlaveID& slaveId,
+ const UPID& pid)
+{
+ // TODO(benh): Start a reverse lookup to ensure IP maps to hostname.
+
+ Slave* slave = NULL;
+
+ // Check whether all slaves, or at least this slave is allocated.
+ bool allocated = (conf.get<string>("slaves", "*") == "*") ||
+ (slaveHostnamePorts.count(slaveInfo.hostname()) > 0 &&
+ slaveHostnamePorts[slaveInfo.hostname()].count(pid.port) > 0);
+
+ if (allocated) {
+ Slave* slave = new Slave(slaveInfo, slaveId, pid, elapsed());
+
+ LOG(INFO) << "Registering slave " << slave->slaveId
+ << " at " << slave->pid;
+
+ slaves[slave->slaveId] = slave;
+ pidToSlaveId[slave->pid] = slave->slaveId;
+ link(slave->pid);
+
+ allocator->slaveAdded(slave);
+
+ MSG<M2S_REGISTER_REPLY> out;
+ out.mutable_slave_id()->MergeFrom(slave->slaveId);
+ send(slave->pid, out);
+
+ // TODO(benh):
+ // // Ask the slaves manager to monitor this slave for us.
+ // process::dispatch(slavesManager->self(), &SlavesManager::monitor,
+ // slave->pid, slave->info, slave->slaveId);
+
+ // Set up an observer for the slave.
+ slave->observer = new SlaveObserver(slave->pid, slave->info,
+ slave->slaveId, slavesManager->self());
+ process::spawn(slave->observer);
+ } else {
+ LOG(WARNING) << "Cannot add slave at " << slaveInfo.hostname()
+ << " because not in allocated set of slaves!";
+ }
+
+ return slave;
+}
+
+
// Lose all of a slave's tasks and delete the slave object
void Master::removeSlave(Slave* slave)
{
@@ -2042,6 +2186,16 @@ void Master::removeSlave(Slave* slave)
send(framework->pid, out);
}
+ // TODO(benh):
+ // // Tell the slaves manager to stop monitoring this slave for us.
+ // process::dispatch(slavesManager->self(), &SlavesManager::forget,
+ // slave->pid, slave->info, slave->slaveId);
+
+ // Kill the slave observer.
+ process::post(slave->observer->self(), process::TERMINATE);
+ process::wait(slave->observer->self());
+ delete slave->observer;
+
// TODO(benh): unlink(slave->pid);
pidToSlaveId.erase(slave->pid);
Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Sun Jun 5 09:19:53 2011
@@ -26,6 +26,7 @@
#include "common/fatal.hpp"
#include "common/foreach.hpp"
#include "common/resources.hpp"
+#include "common/tokenize.hpp"
#include "common/type_utils.hpp"
#include "configurator/configurator.hpp"
@@ -92,6 +93,8 @@ struct SlotOffer;
struct Framework;
struct Slave;
class Allocator;
+class Master;
+class SlavesHttpServer;
// Resources offered on a particular slave.
@@ -105,6 +108,60 @@ struct SlaveResources
};
+// A resource offer.
+struct SlotOffer
+{
+ OfferID offerId;
+ FrameworkID frameworkId;
+ std::vector<SlaveResources> resources;
+
+ SlotOffer(const OfferID& _offerId,
+ const FrameworkID& _frameworkId,
+ const std::vector<SlaveResources>& _resources)
+ : offerId(_offerId), frameworkId(_frameworkId), resources(_resources) {}
+};
+
+
+class SlavesManagerStorage : public process::Process<SlavesManagerStorage>
+{
+public:
+ virtual process::Promise<bool> add(const std::string& hostname, uint16_t port) { return true; }
+ virtual process::Promise<bool> remove(const std::string& hostname, uint16_t port) { return true; }
+ virtual process::Promise<bool> activate(const std::string& hostname, uint16_t port) { return true; }
+ virtual process::Promise<bool> deactivate(const std::string& hostname, uint16_t port) { return true; }
+};
+
+
+class SlavesManager : public process::Process<SlavesManager>
+{
+public:
+ SlavesManager(const Configuration& conf, const process::PID<Master>& _master);
+
+ virtual ~SlavesManager();
+
+ static void registerOptions(Configurator* configurator);
+
+ bool add(const std::string& hostname, uint16_t port);
+ bool remove(const std::string& hostname, uint16_t port);
+ bool activate(const std::string& hostname, uint16_t port);
+ bool deactivate(const std::string& hostname, uint16_t port);
+
+ void updateActive(const std::map<std::string, std::set<uint16_t> >& updated);
+ void updateInactive(const std::map<std::string, std::set<uint16_t> >& updated);
+
+private:
+ process::Promise<process::HttpResponse> add(const process::HttpRequest& request);
+ process::Promise<process::HttpResponse> remove(const process::HttpRequest& request);
+
+ const process::PID<Master> master;
+
+ std::map<std::string, std::set<uint16_t> > active;
+ std::map<std::string, std::set<uint16_t> > inactive;
+
+ SlavesManagerStorage* storage;
+};
+
+
class Master : public MesosProcess<Master>
{
public:
@@ -113,7 +170,7 @@ public:
virtual ~Master();
- static void registerOptions(Configurator* conf);
+ static void registerOptions(Configurator* configurator);
process::Promise<state::MasterState*> getState();
@@ -159,7 +216,8 @@ public:
const FrameworkID& frameworkId,
const ExecutorID& executorId,
int32_t result);
- void slaveHeartbeat(const SlaveID& slaveId);
+ void activatedSlaveHostnamePort(const std::string& hostname, uint16_t port);
+ void deactivatedSlaveHostnamePort(const std::string& hostname, uint16_t port);
void timerTick();
void frameworkExpired(const FrameworkID& frameworkId);
void exited();
@@ -209,6 +267,10 @@ protected:
// reschedule slot offers for slots that were assigned to this framework
void removeFramework(Framework* framework);
+ // Add a slave.
+ Slave* addSlave(const SlaveInfo& slaveInfo, const SlaveID& slaveId,
+ const process::UPID& pid);
+
// Lose all of a slave's tasks and delete the slave object
void removeSlave(Slave* slave);
@@ -221,7 +283,11 @@ protected:
const Configuration& getConfiguration();
private:
- Configuration conf;
+ const Configuration conf;
+
+ SlavesManager* slavesManager;
+
+ boost::unordered_map<std::string, boost::unordered_set<uint16_t> > slaveHostnamePorts;
boost::unordered_map<FrameworkID, Framework*> frameworks;
boost::unordered_map<SlaveID, Slave*> slaves;
@@ -247,6 +313,138 @@ private:
};
+const double SLAVE_PONG_TIMEOUT = 15.0;
+const int MAX_SLAVE_TIMEOUTS = 5;
+
+
+class SlaveObserver : public process::Process<SlaveObserver>
+{
+public:
+ SlaveObserver(const process::UPID& _slave,
+ const SlaveInfo& _slaveInfo,
+ const SlaveID& _slaveId,
+ const process::PID<SlavesManager>& _slavesManager)
+ : slave(_slave), slaveInfo(_slaveInfo), slaveId(_slaveId),
+ slavesManager(_slavesManager), timeouts(0), pinged(false) {}
+
+ virtual ~SlaveObserver() {}
+
+protected:
+ virtual void operator () ()
+ {
+ // Send a ping some interval after we heard the last pong. Or if
+ // we don't hear a pong, increment the number of timeouts from the
+ // slave and try and send another ping. If we eventually timeout too
+ // many missed pongs in a row, consider the slave dead.
+ do {
+ std::cout.precision(15);
+ std::cout << self() << " elapsed: " << elapsed() << std::endl;
+ receive(SLAVE_PONG_TIMEOUT);
+ if (name() == PONG) {
+ timeouts = 0;
+ pinged = false;
+ } else if (name() == process::TIMEOUT) {
+ if (pinged) {
+ timeouts++;
+ pinged = false;
+ std::cout << self() << " missing slave PONG, timeouts = " << timeouts << std::endl;
+ }
+
+ send(slave, PING);
+ pinged = true;
+ } else if (name() == process::TERMINATE) {
+ return;
+ }
+ } while (timeouts < MAX_SLAVE_TIMEOUTS);
+
+ // Tell the slave manager to deactivate the slave, this will take
+ // care of updating the master too.
+ while (!process::call(slavesManager, &SlavesManager::deactivate,
+ slaveInfo.hostname(), slave.port)) {
+ LOG(WARNING) << "Slave \"failed\" but can't be deactivated, retrying";
+ }
+ }
+
+private:
+ const process::UPID slave;
+ const SlaveInfo slaveInfo;
+ const SlaveID slaveId;
+ const process::PID<SlavesManager> slavesManager;
+ int timeouts;
+ bool pinged;
+};
+
+
+// A connected slave.
+struct Slave
+{
+ SlaveInfo info;
+ SlaveID slaveId;
+ process::UPID pid;
+
+ bool active; // Turns false when slave is being removed
+ double connectTime;
+ double lastHeartbeat;
+
+ Resources resourcesOffered; // Resources currently in offers
+ Resources resourcesInUse; // Resources currently used by tasks
+
+ boost::unordered_map<std::pair<FrameworkID, TaskID>, Task*> tasks;
+ boost::unordered_set<SlotOffer*> slotOffers; // Active offers on this slave.
+
+ SlaveObserver* observer;
+
+ Slave(const SlaveInfo& _info, const SlaveID& _slaveId,
+ const process::UPID& _pid, double time)
+ : info(_info), slaveId(_slaveId), pid(_pid), active(true),
+ connectTime(time), lastHeartbeat(time) {}
+
+ ~Slave() {}
+
+ Task* lookupTask(const FrameworkID& frameworkId, const TaskID& taskId)
+ {
+ foreachpair (_, Task* task, tasks) {
+ if (task->framework_id() == frameworkId && task->task_id() == taskId) {
+ return task;
+ }
+ }
+
+ return NULL;
+ }
+
+ void addTask(Task* task)
+ {
+ std::pair<FrameworkID, TaskID> key =
+ std::make_pair(task->framework_id(), task->task_id());
+ CHECK(tasks.count(key) == 0);
+ tasks[key] = task;
+ foreach (const Resource& resource, task->resources()) {
+ resourcesInUse += resource;
+ }
+ }
+
+ void removeTask(Task* task)
+ {
+ std::pair<FrameworkID, TaskID> key =
+ std::make_pair(task->framework_id(), task->task_id());
+ CHECK(tasks.count(key) > 0);
+ tasks.erase(key);
+ foreach (const Resource& resource, task->resources()) {
+ resourcesInUse -= resource;
+ }
+ }
+
+ Resources resourcesFree()
+ {
+ Resources resources;
+ foreach (const Resource& resource, info.resources()) {
+ resources += resource;
+ }
+ return resources - (resourcesOffered + resourcesInUse);
+ }
+};
+
+
class FrameworkFailoverTimer : public process::Process<FrameworkFailoverTimer>
{
public:
@@ -275,20 +473,6 @@ private:
};
-// A resource offer.
-struct SlotOffer
-{
- OfferID offerId;
- FrameworkID frameworkId;
- std::vector<SlaveResources> resources;
-
- SlotOffer(const OfferID& _offerId,
- const FrameworkID& _frameworkId,
- const std::vector<SlaveResources>& _resources)
- : offerId(_offerId), frameworkId(_frameworkId), resources(_resources) {}
-};
-
-
// An connected framework.
struct Framework
{
@@ -388,74 +572,6 @@ struct Framework
};
-// A connected slave.
-struct Slave
-{
- SlaveInfo info;
- SlaveID slaveId;
- process::UPID pid;
-
- bool active; // Turns false when slave is being removed
- double connectTime;
- double lastHeartbeat;
-
- Resources resourcesOffered; // Resources currently in offers
- Resources resourcesInUse; // Resources currently used by tasks
-
- boost::unordered_map<std::pair<FrameworkID, TaskID>, Task*> tasks;
- boost::unordered_set<SlotOffer*> slotOffers; // Active offers on this slave.
-
- Slave(const SlaveInfo& _info, const SlaveID& _slaveId,
- const process::UPID& _pid, double time)
- : info(_info), slaveId(_slaveId), pid(_pid), active(true),
- connectTime(time), lastHeartbeat(time) {}
-
- ~Slave() {}
-
- Task* lookupTask(const FrameworkID& frameworkId, const TaskID& taskId)
- {
- foreachpair (_, Task* task, tasks) {
- if (task->framework_id() == frameworkId && task->task_id() == taskId) {
- return task;
- }
- }
-
- return NULL;
- }
-
- void addTask(Task* task)
- {
- std::pair<FrameworkID, TaskID> key =
- std::make_pair(task->framework_id(), task->task_id());
- CHECK(tasks.count(key) == 0);
- tasks[key] = task;
- foreach (const Resource& resource, task->resources()) {
- resourcesInUse += resource;
- }
- }
-
- void removeTask(Task* task)
- {
- std::pair<FrameworkID, TaskID> key =
- std::make_pair(task->framework_id(), task->task_id());
- CHECK(tasks.count(key) > 0);
- tasks.erase(key);
- foreach (const Resource& resource, task->resources()) {
- resourcesInUse -= resource;
- }
- }
-
- Resources resourcesFree()
- {
- Resources resources;
- foreach (const Resource& resource, info.resources()) {
- resources += resource;
- }
- return resources - (resourcesOffered + resourcesInUse);
- }
-};
-
-
// Pretty-printing of SlotOffers, Tasks, Frameworks, Slaves, etc.
inline std::ostream& operator << (std::ostream& stream, const SlotOffer *o)
Modified: incubator/mesos/trunk/src/messaging/messages.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.cpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.cpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.cpp Sun Jun 5 09:19:53 2011
@@ -1,76 +1,80 @@
#include "messaging/messages.hpp"
-#define DEFINE_MESSAGE(name) \
+#define ALLOCATE_MESSAGE(name) \
char name[] = #name
namespace mesos { namespace internal {
// From framework to master.
-DEFINE_MESSAGE(F2M_REGISTER_FRAMEWORK);
-DEFINE_MESSAGE(F2M_REREGISTER_FRAMEWORK);
-DEFINE_MESSAGE(F2M_UNREGISTER_FRAMEWORK);
-DEFINE_MESSAGE(F2M_RESOURCE_OFFER_REPLY);
-DEFINE_MESSAGE(F2M_REVIVE_OFFERS);
-DEFINE_MESSAGE(F2M_KILL_TASK);
-DEFINE_MESSAGE(F2M_FRAMEWORK_MESSAGE);
-DEFINE_MESSAGE(F2M_STATUS_UPDATE_ACK);
+ALLOCATE_MESSAGE(F2M_REGISTER_FRAMEWORK);
+ALLOCATE_MESSAGE(F2M_REREGISTER_FRAMEWORK);
+ALLOCATE_MESSAGE(F2M_UNREGISTER_FRAMEWORK);
+ALLOCATE_MESSAGE(F2M_RESOURCE_OFFER_REPLY);
+ALLOCATE_MESSAGE(F2M_REVIVE_OFFERS);
+ALLOCATE_MESSAGE(F2M_KILL_TASK);
+ALLOCATE_MESSAGE(F2M_FRAMEWORK_MESSAGE);
+ALLOCATE_MESSAGE(F2M_STATUS_UPDATE_ACK);
// From master to framework.
-DEFINE_MESSAGE(M2F_REGISTER_REPLY);
-DEFINE_MESSAGE(M2F_RESOURCE_OFFER);
-DEFINE_MESSAGE(M2F_RESCIND_OFFER);
-DEFINE_MESSAGE(M2F_STATUS_UPDATE);
-DEFINE_MESSAGE(M2F_LOST_SLAVE);
-DEFINE_MESSAGE(M2F_FRAMEWORK_MESSAGE);
-DEFINE_MESSAGE(M2F_ERROR);
+ALLOCATE_MESSAGE(M2F_REGISTER_REPLY);
+ALLOCATE_MESSAGE(M2F_RESOURCE_OFFER);
+ALLOCATE_MESSAGE(M2F_RESCIND_OFFER);
+ALLOCATE_MESSAGE(M2F_STATUS_UPDATE);
+ALLOCATE_MESSAGE(M2F_LOST_SLAVE);
+ALLOCATE_MESSAGE(M2F_FRAMEWORK_MESSAGE);
+ALLOCATE_MESSAGE(M2F_ERROR);
// From slave to master.
-DEFINE_MESSAGE(S2M_REGISTER_SLAVE);
-DEFINE_MESSAGE(S2M_REREGISTER_SLAVE);
-DEFINE_MESSAGE(S2M_UNREGISTER_SLAVE);
-DEFINE_MESSAGE(S2M_STATUS_UPDATE);
-DEFINE_MESSAGE(S2M_FRAMEWORK_MESSAGE);
-DEFINE_MESSAGE(S2M_EXITED_EXECUTOR);
+ALLOCATE_MESSAGE(S2M_REGISTER_SLAVE);
+ALLOCATE_MESSAGE(S2M_REREGISTER_SLAVE);
+ALLOCATE_MESSAGE(S2M_UNREGISTER_SLAVE);
+ALLOCATE_MESSAGE(S2M_STATUS_UPDATE);
+ALLOCATE_MESSAGE(S2M_FRAMEWORK_MESSAGE);
+ALLOCATE_MESSAGE(S2M_EXITED_EXECUTOR);
// From slave heart to master.
-DEFINE_MESSAGE(SH2M_HEARTBEAT);
+ALLOCATE_MESSAGE(SH2M_HEARTBEAT);
// From master to slave.
-DEFINE_MESSAGE(M2S_REGISTER_REPLY);
-DEFINE_MESSAGE(M2S_REREGISTER_REPLY);
-DEFINE_MESSAGE(M2S_RUN_TASK);
-DEFINE_MESSAGE(M2S_KILL_TASK);
-DEFINE_MESSAGE(M2S_KILL_FRAMEWORK);
-DEFINE_MESSAGE(M2S_FRAMEWORK_MESSAGE);
-DEFINE_MESSAGE(M2S_UPDATE_FRAMEWORK);
-DEFINE_MESSAGE(M2S_STATUS_UPDATE_ACK);
+ALLOCATE_MESSAGE(M2S_REGISTER_REPLY);
+ALLOCATE_MESSAGE(M2S_REREGISTER_REPLY);
+ALLOCATE_MESSAGE(M2S_RUN_TASK);
+ALLOCATE_MESSAGE(M2S_KILL_TASK);
+ALLOCATE_MESSAGE(M2S_KILL_FRAMEWORK);
+ALLOCATE_MESSAGE(M2S_FRAMEWORK_MESSAGE);
+ALLOCATE_MESSAGE(M2S_UPDATE_FRAMEWORK);
+ALLOCATE_MESSAGE(M2S_STATUS_UPDATE_ACK);
// From executor to slave.
-DEFINE_MESSAGE(E2S_REGISTER_EXECUTOR);
-DEFINE_MESSAGE(E2S_STATUS_UPDATE);
-DEFINE_MESSAGE(E2S_FRAMEWORK_MESSAGE);
+ALLOCATE_MESSAGE(E2S_REGISTER_EXECUTOR);
+ALLOCATE_MESSAGE(E2S_STATUS_UPDATE);
+ALLOCATE_MESSAGE(E2S_FRAMEWORK_MESSAGE);
// From slave to executor.
-DEFINE_MESSAGE(S2E_REGISTER_REPLY);
-DEFINE_MESSAGE(S2E_RUN_TASK);
-DEFINE_MESSAGE(S2E_KILL_TASK);
-DEFINE_MESSAGE(S2E_FRAMEWORK_MESSAGE);
-DEFINE_MESSAGE(S2E_KILL_EXECUTOR);
+ALLOCATE_MESSAGE(S2E_REGISTER_REPLY);
+ALLOCATE_MESSAGE(S2E_RUN_TASK);
+ALLOCATE_MESSAGE(S2E_KILL_TASK);
+ALLOCATE_MESSAGE(S2E_FRAMEWORK_MESSAGE);
+ALLOCATE_MESSAGE(S2E_KILL_EXECUTOR);
#ifdef __sun__
// From projd to slave.
-DEFINE_MESSAGE(PD2S_REGISTER_PROJD);
-DEFINE_MESSAGE(PD2S_PROJD_READY);
+ALLOCATE_MESSAGE(PD2S_REGISTER_PROJD);
+ALLOCATE_MESSAGE(PD2S_PROJD_READY);
// From slave to projd.
-DEFINE_MESSAGE(S2PD_UPDATE_RESOURCES);
-DEFINE_MESSAGE(S2PD_KILL_ALL);
+ALLOCATE_MESSAGE(S2PD_UPDATE_RESOURCES);
+ALLOCATE_MESSAGE(S2PD_KILL_ALL);
#endif // __sun__
// From master detector to processes.
-DEFINE_MESSAGE(GOT_MASTER_TOKEN);
-DEFINE_MESSAGE(NEW_MASTER_DETECTED);
-DEFINE_MESSAGE(NO_MASTER_DETECTED);
-DEFINE_MESSAGE(MASTER_DETECTION_FAILURE);
+ALLOCATE_MESSAGE(GOT_MASTER_TOKEN);
+ALLOCATE_MESSAGE(NEW_MASTER_DETECTED);
+ALLOCATE_MESSAGE(NO_MASTER_DETECTED);
+ALLOCATE_MESSAGE(MASTER_DETECTION_FAILURE);
+
+// Generic messages.
+ALLOCATE_MESSAGE(PING);
+ALLOCATE_MESSAGE(PONG);
}} // namespace mesos { namespace internal {
Modified: incubator/mesos/trunk/src/messaging/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.hpp?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.hpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.hpp Sun Jun 5 09:19:53 2011
@@ -75,9 +75,6 @@ MESSAGE(S2M_STATUS_UPDATE, StatusUpdateM
MESSAGE(S2M_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
MESSAGE(S2M_EXITED_EXECUTOR, ExitedExecutorMessage);
-// From slave heart to master.
-MESSAGE(SH2M_HEARTBEAT, HeartbeatMessage);
-
// From master to slave.
MESSAGE(M2S_REGISTER_REPLY, SlaveRegisteredMessage);
MESSAGE(M2S_REREGISTER_REPLY, SlaveRegisteredMessage);
@@ -116,6 +113,10 @@ MESSAGE(NEW_MASTER_DETECTED, NewMasterDe
MESSAGE(NO_MASTER_DETECTED);
MESSAGE(MASTER_DETECTION_FAILURE);
+// Generic messages.
+MESSAGE(PING);
+MESSAGE(PONG);
+
// Type conversions helpful for changing between protocol buffer types
// and standard C++ types (for parameters).
Modified: incubator/mesos/trunk/src/messaging/messages.proto
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.proto?rev=1132287&r1=1132286&r2=1132287&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.proto (original)
+++ incubator/mesos/trunk/src/messaging/messages.proto Sun Jun 5 09:19:53 2011
@@ -131,7 +131,6 @@ message ReregisterSlaveMessage {
message SlaveRegisteredMessage {
required SlaveID slave_id = 1;
- required double heartbeat_interval = 2;
}