You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:00:36 UTC
svn commit: r1131711 - in /incubator/mesos/trunk/src: Makefile.in master.cpp
slave.cpp third_party/libprocess/process.cpp
third_party/libprocess/process.hpp third_party/libprocess/tuple-impl.hpp
zookeeper.cpp zookeeper.hpp
Author: benh
Date: Sun Jun 5 05:00:35 2011
New Revision: 1131711
URL: http://svn.apache.org/viewvc?rev=1131711&view=rev
Log:
Updates to keep ZooKeeper (libprocess timing bug) and added support for passing doubles instead of time_t to libprocess functions.
Modified:
incubator/mesos/trunk/src/Makefile.in
incubator/mesos/trunk/src/master.cpp
incubator/mesos/trunk/src/slave.cpp
incubator/mesos/trunk/src/third_party/libprocess/process.cpp
incubator/mesos/trunk/src/third_party/libprocess/process.hpp
incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp
incubator/mesos/trunk/src/zookeeper.cpp
incubator/mesos/trunk/src/zookeeper.hpp
Modified: incubator/mesos/trunk/src/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.in?rev=1131711&r1=1131710&r2=1131711&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.in (original)
+++ incubator/mesos/trunk/src/Makefile.in Sun Jun 5 05:00:35 2011
@@ -19,13 +19,13 @@ RUBY_HEADERS = @RUBY_HEADERS@
WITH_WEBUI = @WITH_WEBUI@
WEBUI_FLAGS = @WEBUI_FLAGS@
+WITH_ZOOKEEPER = @WITH_ZOOKEEPER@
+
LIBEV = third_party/libprocess/third_party/libev-3.8
GLOG = third_party/glog-0.3.0
GTEST = third_party/gtest-1.4.0-patched
-ZOOKEEPER = third_party/zookeeper-3.3.0/src/c
-
# Ensure that we get better debugging info.
CFLAGS += -g
CXXFLAGS += -g
@@ -47,16 +47,11 @@ CFLAGS += -Ithird_party/libprocess
CXXFLAGS += -Ithird_party/libprocess
LDFLAGS += -Lthird_party/libprocess
-# Add libev to LDFLAGS.
-LDFLAGS += -L$(LIBEV)/.libs
-
-# Add glog and gtest and zookeeper to include and lib paths
+# Add glog and gtest to include and lib paths
CXXFLAGS += -I$(GLOG)/src
CXXFLAGS += -I$(GTEST)/include
-CXXFLAGS += -I$(ZOOKEEPER)/include -I$(ZOOKEEPER)/generated -DTHREADED
LDFLAGS += -L$(GLOG)/.libs
LDFLAGS += -L$(GTEST)/lib/.libs
-LDFLAGS += -L$(ZOOKEEPER)/.libs
# Add dependency tracking to CFLAGS, CXXFLAGS.
CFLAGS += -MMD -MP
@@ -70,8 +65,16 @@ CXXFLAGS += -DBUILD_DATE="\"$$(date '+%Y
CFLAGS += -DBUILD_USER="\"$$USER\""
CXXFLAGS += -DBUILD_USER="\"$$USER\""
+# Add libev to LDFLAGS.
+LDFLAGS += -L$(LIBEV)/.libs
+
# Add libev, libprocess, pthread, and dl to LIBS.
-LIBS += -lglog -lgtest -lprocess -lev -lpthread -ldl -lzookeeper_mt
+LIBS += -lglog -lgtest -lprocess -lev -lpthread -ldl
+
+# Add zookeeper_st to LIBS if necessary.
+ifeq ($(WITH_ZOOKEEPER),1)
+ LIBS += -lzookeeper_st
+endif
NEXUS_EXES = nexus-master nexus-slave nexus-local nexus-launcher alltests \
test-framework test-executor cpp-test-framework cpp-test-executor \
@@ -92,7 +95,7 @@ NEXUS_LIBS = $(SCHED_LIB) $(EXEC_LIB) $(
MASTER_OBJ = master.o allocator_factory.o simple_allocator.o
SLAVE_OBJ = slave.o launcher.o isolation_module_factory.o \
process_based_isolation_module.o
-COMMON_OBJ = fatal.o hash_pid.o messages.o lock.o leader_detector.o url_processor.o ft_messaging.o
+COMMON_OBJ = fatal.o hash_pid.o lock.o messages.o
EXEC_LIB_OBJ = nexus_exec.o
SCHED_LIB_OBJ = nexus_sched.o nexus_local.o params.o
TEST_OBJ = tests/main.o tests/test_master.o tests/test_resources.o
@@ -105,6 +108,11 @@ ifeq ($(OS_NAME),linux)
SLAVE_OBJ += lxc_isolation_module.o
endif
+# Add zookeeper.cpp if necessary.
+ifeq ($(WITH_ZOOKEEPER),1)
+ COMMON_OBJ += zookeeper.o
+endif
+
ALL_OBJ = $(MASTER_OBJ) $(SLAVE_OBJ) $(COMMON_OBJ) \
$(SCHED_LIB_OBJ) $(EXEC_LIB_OBJ) $(TEST_OBJ)
@@ -133,7 +141,7 @@ $(NEXUS_LIBS): $(COMMON_OBJ) third_party
$(NEXUS_EXES): $(COMMON_OBJ) third_party/libprocess/libprocess.a
$(SCHED_LIB): $(SCHED_LIB_OBJ) $(MASTER_OBJ) $(SLAVE_OBJ) $(COMMON_OBJ)
- $(AR) rcs $@ $^ third_party/libprocess/libprocess.a $(GLOG)/.libs/libglog.a $(LIBEV)/.libs/libev.a $(ZOOKEEPER)/.libs/libzookeeper_mt.a
+ $(AR) rcs $@ $^ third_party/libprocess/libprocess.a $(GLOG)/.libs/libglog.a $(LIBEV)/.libs/libev.a
$(EXEC_LIB): $(EXEC_LIB_OBJ) $(COMMON_OBJ)
$(AR) rcs $@ $^
@@ -252,7 +260,6 @@ third_party:
$(MAKE) -C third_party/libprocess
$(MAKE) -C $(GLOG)
$(MAKE) -C $(GTEST)
- $(MAKE) -C $(ZOOKEEPER)
all: third_party $(NEXUS_LIBS) $(NEXUS_EXES) java python ruby
@@ -260,7 +267,6 @@ clean:
$(MAKE) -C third_party/libprocess clean
$(MAKE) -C $(GLOG) clean
$(MAKE) -C $(GTEST) clean
- $(MAKE) -C $(ZOOKEEPER) clean
rm -f $(patsubst %.o, %.d, $(ALL_OBJ))
rm -f $(ALL_OBJ)
rm -f $(NEXUS_LIBS)
@@ -276,11 +282,21 @@ clean:
rm -f swig/java/*.class
rm -f swig/python/nexus.py
rm -f swig/python/nexus.pyc
+ rm -f master_webui.o
+ rm -f master_webui.d
rm -f webui/master/swig/master_wrap.h
rm -f webui/master/swig/master_wrap.cpp
rm -f webui/master/swig/master_wrap.o
+ rm -f webui/master/swig/master_wrap.d
rm -f webui/master/swig/master.py
rm -f webui/master/swig/master.pyc
- rm -f TAGS
+ rm -f slave_webui.o
+ rm -f slave_webui.d
+ rm -f webui/slave/swig/slave_wrap.h
+ rm -f webui/slave/swig/slave_wrap.cpp
+ rm -f webui/slave/swig/slave_wrap.o
+ rm -f webui/slave/swig/slave_wrap.d
+ rm -f webui/slave/swig/slave.py
+ rm -f webui/slave/swig/slave.pyc
.PHONY: default third_party all clean java python ruby test
Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131711&r1=1131710&r2=1131711&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun 5 05:00:35 2011
@@ -1,11 +1,15 @@
-#include "allocator.hpp"
-#include "master.hpp"
-#include "master_webui.hpp"
-#include "allocator_factory.hpp"
-#include "ft_messaging.hpp"
+#include "config.hpp" // Need to define first to get USING_ZOOKEEPER
#include <glog/logging.h>
+#ifdef USING_ZOOKEEPER
+#include <zookeeper.hpp>
+#endif
+
+#include "allocator.hpp"
+#include "allocator_factory.hpp"
+#include "master.hpp"
+#include "master_webui.hpp"
using std::endl;
using std::max;
@@ -27,8 +31,75 @@ using namespace nexus::internal;
using namespace nexus::internal::master;
+/* List of ZooKeeper host:port pairs (from master_main.cpp/local.cpp). */
+extern string zookeeper;
+
namespace {
+#ifdef USING_ZOOKEEPER
+class MasterWatcher : public Watcher
+{
+private:
+ Master *master;
+
+public:
+ void process(ZooKeeper *zk, int type, int state, const string &path)
+ {
+ // Connected!
+ if ((state == ZOO_CONNECTED_STATE) &&
+ (type == ZOO_SESSION_EVENT)) {
+ // Create znode for master identification.
+ string znode = "/home/nexus/master";
+ string dirname = "/home/nexus";
+ string delimiter = "/";
+ string contents = "";
+
+ int ret;
+ string result;
+
+ // Create directory path znodes as necessary.
+ size_t index = dirname.find(delimiter, 0);
+ while (index < string::npos) {
+ index = dirname.find(delimiter, index+1);
+ string prefix = dirname.substr(0, index);
+ ret = zk->create(prefix, contents, ZOO_CREATOR_ALL_ACL,
+ 0, &result);
+ if (ret != ZOK && ret != ZNODEEXISTS)
+ fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
+ }
+
+ // Now create znode.
+ ret = zk->create(znode, master->getPID(), ZOO_CREATOR_ALL_ACL,
+ ZOO_EPHEMERAL, &result);
+
+ // If node already exists, update its value.
+ if (ret == ZNODEEXISTS)
+ ret = zk->set(znode, master->getPID(), -1);
+
+ if (ret != ZOK)
+ fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
+ } else if ((state == ZOO_EXPIRED_SESSION_STATE) &&
+ (type == ZOO_SESSION_EVENT)) {
+ // TODO(benh): Reconnect if session expires. Note the Zookeeper
+ // C library retries in the case of connection timeouts,
+ // connection loss, etc. Only expired sessions require
+ // explicitly reconnecting.
+ fatal("connection to ZooKeeper expired!");
+ } else if ((state == ZOO_CONNECTING_STATE) &&
+ (type == ZOO_SESSION_EVENT)) {
+ // The client library automatically reconnects, taking into
+ // account failed servers in the connection string,
+ // appropriately handling the "herd effect", etc.
+ LOG(INFO) << "Lost Zookeeper connection. Retrying (automagically).";
+ } else {
+ fatal("unhandled ZooKeeper event!");
+ }
+ }
+
+ MasterWatcher(Master *_master) : master(_master) {}
+};
+#endif
+
// A process that periodically pings the master to check filter expiries, etc
class AllocatorTimer : public Tuple<Process>
{
@@ -111,50 +182,24 @@ public:
}
-Master::Master(const string &zk)
- : leaderDetector(NULL), nextFrameworkId(0), nextSlaveId(0),
- nextSlotOfferId(0), allocatorType("simple"), masterId(0)
-{
- if (zk != "") {
- pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(zk);
- if (urlPair.first == UrlProcessor::ZOO) {
- isFT = true;
- zkServers = urlPair.second;
- } else {
- LOG(ERROR) << "Failed to parse URL for ZooKeeper servers. URL must start with zoo:// or zoofile://";
- exit(1);
- }
- }
- ftMsg = FTMessaging::getInstance();
-}
+Master::Master()
+ : nextFrameworkId(0), nextSlaveId(0), nextSlotOfferId(0),
+ allocatorType("simple")
+{}
-Master::Master(const string& _allocatorType, const string &zk)
- : leaderDetector(NULL), nextFrameworkId(0), nextSlaveId(0),
- nextSlotOfferId(0), allocatorType(_allocatorType), masterId(0)
-{
- if (zk != "") {
- pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(zk);
- if (urlPair.first == UrlProcessor::ZOO) {
- isFT = true;
- zkServers = urlPair.second;
- } else {
- LOG(ERROR) << "Failed to parse URL for ZooKeeper servers. URL must start with zoo:// or zoofile://";
- exit(1);
- }
- }
- ftMsg = FTMessaging::getInstance();
-}
+Master::Master(const string& _allocatorType)
+ : nextFrameworkId(0), nextSlaveId(0), nextSlotOfferId(0),
+ allocatorType(_allocatorType)
+{}
Master::~Master()
{
- if (isFT && leaderDetector != NULL)
- delete leaderDetector;
LOG(INFO) << "Shutting down master";
delete allocator;
foreachpair (_, Framework *framework, frameworks) {
- foreachpair(_, TaskInfo *task, framework->tasks)
+ foreachpair(_, Task *task, framework->tasks)
delete task;
delete framework;
}
@@ -172,7 +217,7 @@ state::MasterState * Master::getState()
std::ostringstream oss;
oss << self();
state::MasterState *state =
- new state::MasterState(BUILD_DATE, BUILD_USER, oss.str(), isFT);
+ new state::MasterState(BUILD_DATE, BUILD_USER, oss.str());
foreachpair (_, Slave *s, slaves) {
state::Slave *slave = new state::Slave(s->id, s->hostname, s->publicDns,
@@ -185,7 +230,7 @@ state::MasterState * Master::getState()
f->executorInfo.uri, f->resources.cpus, f->resources.mem,
f->connectTime);
state->frameworks.push_back(framework);
- foreachpair (_, TaskInfo *t, f->tasks) {
+ foreachpair (_, Task *t, f->tasks) {
state::Task *task = new state::Task(t->id, t->name, t->frameworkId,
t->slaveId, t->state, t->resources.cpus, t->resources.mem);
framework->tasks.push_back(task);
@@ -259,43 +304,10 @@ SlotOffer * Master::lookupSlotOffer(Offe
return NULL;
}
-void Master::updateFrameworkTasks() {
- foreachpair (SlaveID sid, Slave *slave, slaves) {
- foreachpair (_, TaskInfo *task, slave->tasks) {
- updateFrameworkTasks(task);
- }
- }
-}
-
-void Master::updateFrameworkTasks(TaskInfo *task) {
- Framework *fwrk = lookupFramework(task->frameworkId);
- if (fwrk != NULL) {
- if (fwrk->tasks.find(task->id) == fwrk->tasks.end()) {
- fwrk->tasks[task->id] = task;
- fwrk->resources += task->resources;
- }
- }
-}
-
void Master::operator () ()
{
- LOG(INFO) << "Master started at nexus://" << self();
-
- if (isFT) {
- LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
- ostringstream lpid;
- lpid << self();
- leaderDetector = new LeaderDetector(zkServers, true, lpid.str());
-
- string myLeaderSeq = leaderDetector->getMySeq();
- if (myLeaderSeq == "") {
- LOG(FATAL) << "Cannot proceed since new FT master sequence number could not be fetched from ZK.";
- exit(1);
- }
- masterId = lexical_cast<long>(myLeaderSeq);
- LOG(INFO) << "Master ID:" << masterId;
- }
+ LOG(INFO) << "Master started at " << self();
allocator = createAllocator();
if (!allocator)
@@ -304,16 +316,21 @@ void Master::operator () ()
link(spawn(new AllocatorTimer(self())));
//link(spawn(new SharesPrinter(self())));
+#ifdef USING_ZOOKEEPER
+ ZooKeeper *zk;
+ if (!zookeeper.empty())
+ zk = new ZooKeeper(zookeeper, 10000, new MasterWatcher(this));
+#endif
+
while (true) {
switch (receive()) {
case F2M_REGISTER_FRAMEWORK: {
- FrameworkID fid = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextFrameworkId++);
-
+ FrameworkID fid = nextFrameworkId++;
Framework *framework = new Framework(from(), fid);
unpack<F2M_REGISTER_FRAMEWORK>(framework->name,
- framework->user,
- framework->executorInfo);
+ framework->user,
+ framework->executorInfo);
LOG(INFO) << "Registering " << framework << " at " << framework->pid;
frameworks[fid] = framework;
pidToFid[framework->pid] = fid;
@@ -325,78 +342,13 @@ void Master::operator () ()
break;
}
- case F2M_REREGISTER_FRAMEWORK: {
-
- Framework *framework = new Framework(from());
- unpack<F2M_REREGISTER_FRAMEWORK>(framework->id,
- framework->name,
- framework->user,
- framework->executorInfo);
-
- if (framework->id == "") {
- DLOG(INFO) << "Framework reconnecting without a FrameworkID, generating new id";
- framework->id = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextFrameworkId++);
- }
-
- LOG(INFO) << "Registering " << framework << " at " << framework->pid;
- frameworks[framework->id] = framework;
- pidToFid[framework->pid] = framework->id;
-
- updateFrameworkTasks();
-
- link(framework->pid);
- send(framework->pid, pack<M2F_REGISTER_REPLY>(framework->id));
- allocator->frameworkAdded(framework);
- if (framework->executorInfo.uri == "")
- terminateFramework(framework, 1, "No executor URI given");
-
- timeval tv;
- gettimeofday(&tv, NULL);
-
- DLOG(INFO) << tv.tv_sec << "." << tv.tv_usec << " STAT: Slave count: " << slaves.size() << " Framework count: " << frameworks.size();
-
- break;
- }
-
case F2M_UNREGISTER_FRAMEWORK: {
FrameworkID fid;
unpack<F2M_UNREGISTER_FRAMEWORK>(fid);
LOG(INFO) << "Asked to unregister framework " << fid;
Framework *framework = lookupFramework(fid);
if (framework != NULL)
- removeFramework(framework);
- break;
- }
-
- case F2M_FT_SLOT_OFFER_REPLY: {
- FrameworkID fid;
- OfferID oid;
- vector<TaskDescription> tasks;
- Params params;
- string ftId, senderStr;
- unpack<F2M_FT_SLOT_OFFER_REPLY>(ftId, senderStr, fid, oid, tasks, params);
- PID senderPid;
- istringstream ss(senderStr);
- ss >> senderPid;
- if (!ftMsg->acceptMessageAckTo(senderPid, ftId, senderStr)) {
- LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << ftId;
- break;
- }
- Framework *framework = lookupFramework(fid);
- if (framework != NULL) {
- SlotOffer *offer = lookupSlotOffer(oid);
- if (offer != NULL) {
- processOfferReply(offer, tasks, params);
- } else {
- // The slot offer is gone, meaning that we rescinded it or that
- // the slave was lost; immediately report any tasks in it as lost
- foreach (TaskDescription &t, tasks) {
- send(framework->pid,
- pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
- }
- }
- } else
- DLOG(INFO) << "F2M_FT_SLOT_OFFER_REPLY error: couldn't lookup framework id" << fid;
+ removeFramework(framework);
break;
}
@@ -408,17 +360,17 @@ void Master::operator () ()
unpack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params);
Framework *framework = lookupFramework(fid);
if (framework != NULL) {
- SlotOffer *offer = lookupSlotOffer(oid);
- if (offer != NULL) {
- processOfferReply(offer, tasks, params);
- } else {
- // The slot offer is gone, meaning that we rescinded it or that
- // the slave was lost; immediately report any tasks in it as lost
- foreach (TaskDescription &t, tasks) {
- send(framework->pid,
- pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
- }
- }
+ SlotOffer *offer = lookupSlotOffer(oid);
+ if (offer != NULL) {
+ processOfferReply(offer, tasks, params);
+ } else {
+ // The slot offer is gone, meaning that we rescinded it or that
+ // the slave was lost; immediately report any tasks in it as lost
+ foreach (TaskDescription &t, tasks) {
+ send(framework->pid,
+ pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
+ }
+ }
}
break;
}
@@ -428,9 +380,9 @@ void Master::operator () ()
unpack<F2M_REVIVE_OFFERS>(fid);
Framework *framework = lookupFramework(fid);
if (framework != NULL) {
- LOG(INFO) << "Reviving offers for " << framework;
- framework->slaveFilter.clear();
- allocator->offersRevived(framework);
+ LOG(INFO) << "Reviving offers for " << framework;
+ framework->slaveFilter.clear();
+ allocator->offersRevived(framework);
}
break;
}
@@ -441,39 +393,34 @@ void Master::operator () ()
unpack<F2M_KILL_TASK>(fid, tid);
Framework *framework = lookupFramework(fid);
if (framework != NULL) {
- TaskInfo *task = framework->lookupTask(tid);
- if (task != NULL) {
- LOG(INFO) << "Asked to kill " << task << " by its framework";
- killTask(task);
- }
+ Task *task = framework->lookupTask(tid);
+ if (task != NULL) {
+ LOG(INFO) << "Asked to kill " << task << " by its framework";
+ killTask(task);
+ }
}
break;
}
- case F2M_FT_FRAMEWORK_MESSAGE: {
+ case F2M_FRAMEWORK_MESSAGE: {
FrameworkID fid;
FrameworkMessage message;
- string ftId, senderStr;
- unpack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, fid, message);
+ unpack<F2M_FRAMEWORK_MESSAGE>(fid, message);
Framework *framework = lookupFramework(fid);
if (framework != NULL) {
- Slave *slave = lookupSlave(message.slaveId);
- if (slave != NULL) {
- LOG(INFO) << "Sending framework message to " << slave;
- send(slave->pid, pack<M2S_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, fid, message));
- } else
- DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup framework id" << fid;
- } else
- DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup slave id" << message.slaveId;
+ Slave *slave = lookupSlave(message.slaveId);
+ if (slave != NULL) {
+ LOG(INFO) << "Sending framework message to " << slave;
+ send(slave->pid, pack<M2S_FRAMEWORK_MESSAGE>(fid, message));
+ }
+ }
break;
}
case S2M_REGISTER_SLAVE: {
- string slaveId = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextSlaveId++);
-
- Slave *slave = new Slave(from(), slaveId);
+ Slave *slave = new Slave(from(), nextSlaveId++);
unpack<S2M_REGISTER_SLAVE>(slave->hostname, slave->publicDns,
- slave->resources);
+ slave->resources);
LOG(INFO) << "Registering " << slave << " at " << slave->pid;
slaves[slave->id] = slave;
pidToSid[slave->pid] = slave->id;
@@ -483,73 +430,29 @@ void Master::operator () ()
break;
}
- case S2M_REREGISTER_SLAVE: {
- Slave *slave = new Slave(from());
- vector<TaskInfo> taskVec;
-
- unpack<S2M_REREGISTER_SLAVE>(slave->id, slave->hostname, slave->publicDns,
- slave->resources, taskVec);
-
- if (slave->id == "") {
- slave->id = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextSlaveId++);
- DLOG(WARNING) << "Slave re-registered without a SlaveID, generating a new id for it.";
- }
-
- foreach(TaskInfo &ti, taskVec) {
- TaskInfo *tip = new TaskInfo(ti);
- slave->addTask(tip);
- updateFrameworkTasks(tip);
- }
-
- //alibandali
- LOG(INFO) << "Re-registering " << slave << " at " << slave->pid;
- slaves[slave->id] = slave;
- pidToSid[slave->pid] = slave->id;
- link(slave->pid);
- send(slave->pid, pack<M2S_REREGISTER_REPLY>(slave->id));
- allocator->slaveAdded(slave);
-
- timeval tv;
- gettimeofday(&tv, NULL);
-
- DLOG(INFO) << tv.tv_sec << "." << tv.tv_usec << " STAT: Slave count: " << slaves.size() << " Framework count: " << frameworks.size();
-
- break;
- }
-
case S2M_UNREGISTER_SLAVE: {
SlaveID sid;
unpack<S2M_UNREGISTER_SLAVE>(sid);
LOG(INFO) << "Asked to unregister slave " << sid;
Slave *slave = lookupSlave(sid);
if (slave != NULL)
- removeSlave(slave);
+ removeSlave(slave);
break;
}
- case S2M_FT_STATUS_UPDATE: {
+ case S2M_STATUS_UPDATE: {
SlaveID sid;
FrameworkID fid;
TaskID tid;
TaskState state;
string data;
- string ftId, senderStr;
-
- unpack<S2M_FT_STATUS_UPDATE>(ftId, senderStr, sid, fid, tid, state, data);
- DLOG(INFO) << "FT: prepare relay ftId:"<< ftId << " from: "<< senderStr;
+ unpack<S2M_STATUS_UPDATE>(sid, fid, tid, state, data);
if (Slave *slave = lookupSlave(sid)) {
- if (Framework *framework = lookupFramework(fid)) {
- // Pass on the status update to the framework
-
- DLOG(INFO) << "FT: relaying ftId:"<< ftId << " from: "<< senderStr;
- send(framework->pid, pack<M2F_FT_STATUS_UPDATE>(ftId, senderStr, tid, state, data));
-
- if (!ftMsg->acceptMessage(ftId, senderStr)) {
- LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << ftId;
- break;
- }
+ if (Framework *framework = lookupFramework(fid)) {
+ // Pass on the status update to the framework
+ send(framework->pid, pack<M2F_STATUS_UPDATE>(tid, state, data));
// Update the task state locally
- TaskInfo *task = slave->lookupTask(fid, tid);
+ Task *task = slave->lookupTask(fid, tid);
if (task != NULL) {
LOG(INFO) << "Status update: " << task << " is in state " << state;
task->state = state;
@@ -560,65 +463,11 @@ void Master::operator () ()
removeTask(task, TRR_TASK_ENDED);
}
}
- } else
- DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup framework id" << fid;
- } else
- DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup slave id" << sid;
-
- break;
- }
-
- case S2M_STATUS_UPDATE: {
- SlaveID sid;
- FrameworkID fid;
- TaskID tid;
- TaskState state;
- string data;
- unpack<S2M_STATUS_UPDATE>(sid, fid, tid, state, data);
- if (Slave *slave = lookupSlave(sid)) {
- if (Framework *framework = lookupFramework(fid)) {
- // Pass on the status update to the framework
- send(framework->pid, pack<M2F_STATUS_UPDATE>(tid, state, data));
- // Update the task state locally
- TaskInfo *task = slave->lookupTask(fid, tid);
- if (task != NULL) {
- LOG(INFO) << "Status update: " << task << " is in state " << state;
- task->state = state;
- // Remove the task if it finished or failed
- if (state == TASK_FINISHED || state == TASK_FAILED ||
- state == TASK_KILLED || state == TASK_LOST) {
- LOG(INFO) << "Removing " << task << " because it's done";
- removeTask(task, TRR_TASK_ENDED);
- }
- }
- } else
- DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup framework id" << fid;
- } else
- DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup slave id" << sid;
- break;
+ }
+ break;
+ }
}
- case S2M_FT_FRAMEWORK_MESSAGE: {
- SlaveID sid;
- FrameworkID fid;
- FrameworkMessage message;
- string ftId, senderStr;
- unpack<S2M_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, sid, fid, message);
- Slave *slave = lookupSlave(sid);
- if (slave != NULL) {
- Framework *framework = lookupFramework(fid);
- if (framework != NULL) {
-
- send(framework->pid, pack<M2F_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, message));
-
- } else
- DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup framework id" << fid;
- } else
- DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup slave id" << sid;
-
- break;
- }
-
case S2M_FRAMEWORK_MESSAGE: {
SlaveID sid;
FrameworkID fid;
@@ -626,9 +475,9 @@ void Master::operator () ()
unpack<S2M_FRAMEWORK_MESSAGE>(sid, fid, message);
Slave *slave = lookupSlave(sid);
if (slave != NULL) {
- Framework *framework = lookupFramework(fid);
- if (framework != NULL)
- send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
+ Framework *framework = lookupFramework(fid);
+ if (framework != NULL)
+ send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
}
break;
}
@@ -640,18 +489,18 @@ void Master::operator () ()
unpack<S2M_LOST_EXECUTOR>(sid, fid, status);
Slave *slave = lookupSlave(sid);
if (slave != NULL) {
- Framework *framework = lookupFramework(fid);
- if (framework != NULL) {
- ostringstream oss;
- if (status == -1) {
- oss << "Executor on " << slave << " (" << slave->hostname
- << ") disconnected";
- } else {
- oss << "Executor on " << slave << " (" << slave->hostname
- << ") exited with status " << status;
- }
- terminateFramework(framework, status, oss.str());
- }
+ Framework *framework = lookupFramework(fid);
+ if (framework != NULL) {
+ ostringstream oss;
+ if (status == -1) {
+ oss << "Executor on " << slave << " (" << slave->hostname
+ << ") disconnected";
+ } else {
+ oss << "Executor on " << slave << " (" << slave->hostname
+ << ") exited with status " << status;
+ }
+ terminateFramework(framework, status, oss.str());
+ }
}
break;
}
@@ -661,43 +510,19 @@ void Master::operator () ()
unpack<SH2M_HEARTBEAT>(sid);
Slave *slave = lookupSlave(sid);
if (slave != NULL)
- //LOG(INFO) << "Received heartbeat for " << slave << " from " << from();
- ;
+ //LOG(INFO) << "Received heartbeat for " << slave << " from " << from();
+ ;
else
- LOG(WARNING) << "Received heartbeat for UNKNOWN slave " << sid
- << " from " << from();
+ LOG(WARNING) << "Received heartbeat for UNKNOWN slave " << sid
+ << " from " << from();
break;
}
case M2M_TIMER_TICK: {
LOG(INFO) << "Allocator timer tick";
foreachpair (_, Framework *framework, frameworks)
- framework->removeExpiredFilters();
+ framework->removeExpiredFilters();
allocator->timerTick();
-
- // int cnts = 0;
- // foreachpair(_, Framework *framework, frameworks) {
- // DLOG(INFO) << (cnts++) << " resourceInUse:" << framework->resources;
- // }
- break;
- }
-
- case FT_RELAY_ACK: {
- string ftId;
- string origPidStr;
- unpack<FT_RELAY_ACK>(ftId, origPidStr);
-
- DLOG(INFO) << "FT_RELAY_ACK for " << ftId << " forwarding it to " << origPidStr;
-
- PID origPid;
- istringstream iss(origPidStr);
- if (!(iss >> origPid)) {
- cerr << "FT: Failed to resolve PID for originator: " << origPidStr << endl;
- break;
- }
-
- send(origPid, pack<FT_RELAY_ACK>(ftId, origPidStr));
-
break;
}
@@ -742,8 +567,7 @@ void Master::operator () ()
OfferID Master::makeOffer(Framework *framework,
const vector<SlaveResources>& resources)
{
- OfferID oid = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextSlotOfferId++);
-
+ OfferID oid = nextSlotOfferId++;
SlotOffer *offer = new SlotOffer(oid, framework->id, resources);
slotOffers[offer->id] = offer;
framework->addOffer(offer);
@@ -757,7 +581,7 @@ OfferID Master::makeOffer(Framework *fra
Params params;
params.set("cpus", r.resources.cpus);
params.set("mem", r.resources.mem);
- SlaveOffer offer(r.slave->id, r.slave->hostname, params.getMap(), r.slave->pid);
+ SlaveOffer offer(r.slave->id, r.slave->hostname, params.getMap());
offers.push_back(offer);
}
send(framework->pid, pack<M2F_SLOT_OFFER>(oid, offers));
@@ -856,13 +680,13 @@ void Master::launchTask(Framework *f, co
Resources res(params.getInt32("cpus", -1),
params.getInt64("mem", -1));
Slave *slave = lookupSlave(t.slaveId);
- TaskInfo *task = f->addTask(t.taskId, t.name, slave->id, res);
+ Task *task = f->addTask(t.taskId, t.name, slave->id, res);
LOG(INFO) << "Launching " << task << " on " << slave;
slave->addTask(task);
allocator->taskAdded(task);
send(slave->pid, pack<M2S_RUN_TASK>(
f->id, t.taskId, f->name, f->user, f->executorInfo,
- t.name, t.arg, t.params, (string)f->pid));
+ t.name, t.arg, t.params));
}
@@ -872,7 +696,7 @@ void Master::rescindOffer(SlotOffer *off
}
-void Master::killTask(TaskInfo *task)
+void Master::killTask(Task *task)
{
LOG(INFO) << "Killing " << task;
Framework *framework = lookupFramework(task->frameworkId);
@@ -941,8 +765,8 @@ void Master::removeFramework(Framework *
send(slave->pid, pack<M2S_KILL_FRAMEWORK>(framework->id));
// Remove pointers to the framework's tasks in slaves
- unordered_map<TaskID, TaskInfo *> tasksCopy = framework->tasks;
- foreachpair (_, TaskInfo *task, tasksCopy) {
+ unordered_map<TaskID, Task *> tasksCopy = framework->tasks;
+ foreachpair (_, Task *task, tasksCopy) {
Slave *slave = lookupSlave(task->slaveId);
CHECK(slave != NULL);
removeTask(task, TRR_FRAMEWORK_LOST);
@@ -971,8 +795,8 @@ void Master::removeSlave(Slave *slave)
// TODO: Notify allocator that a slave removal is beginning?
// Remove pointers to slave's tasks in frameworks, and send status updates
- unordered_map<pair<FrameworkID, TaskID>, TaskInfo *> tasksCopy = slave->tasks;
- foreachpair (_, TaskInfo *task, tasksCopy) {
+ unordered_map<pair<FrameworkID, TaskID>, Task *> tasksCopy = slave->tasks;
+ foreachpair (_, Task *task, tasksCopy) {
Framework *framework = lookupFramework(task->frameworkId);
CHECK(framework != NULL);
send(framework->pid, pack<M2F_STATUS_UPDATE>(task->id, TASK_LOST,
@@ -1013,7 +837,7 @@ void Master::removeSlave(Slave *slave)
// Remove a slot offer (because it was replied or we lost a framework or slave)
-void Master::removeTask(TaskInfo *task, TaskRemovalReason reason)
+void Master::removeTask(Task *task, TaskRemovalReason reason)
{
Framework *framework = lookupFramework(task->frameworkId);
Slave *slave = lookupSlave(task->slaveId);
Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131711&r1=1131710&r2=1131711&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun 5 05:00:35 2011
@@ -1,17 +1,19 @@
+#include "config.hpp" // Need to define first to get USING_ZOOKEEPER
+
#include <getopt.h>
-#include <fstream>
-#include <algorithm>
+#ifdef USING_ZOOKEEPER
+#include <zookeeper.hpp>
+#endif
+
+#include "isolation_module_factory.hpp"
#include "slave.hpp"
#include "slave_webui.hpp"
-#include "isolation_module_factory.hpp"
-#include "url_processor.hpp"
-
-#define FT_TIMEOUT 10
using std::list;
using std::make_pair;
using std::ostringstream;
+using std::istringstream;
using std::pair;
using std::queue;
using std::string;
@@ -25,13 +27,67 @@ using namespace nexus;
using namespace nexus::internal;
using namespace nexus::internal::slave;
+
// There's no gethostbyname2 on Solaris, so fake it by calling gethostbyname
#ifdef __sun__
#define gethostbyname2(name, _) gethostbyname(name)
#endif
+
+/* List of ZooKeeper host:port pairs (from slave_main.cpp/local.cpp). */
+extern string zookeeper;
+
namespace {
+#ifdef USING_ZOOKEEPER
+class SlaveWatcher : public Watcher
+{
+private:
+ Slave *slave;
+
+public:
+ void process(ZooKeeper *zk, int type, int state, const string &path)
+ {
+ if ((state == ZOO_CONNECTED_STATE) &&
+ ((type == ZOO_SESSION_EVENT) || (type == ZOO_CREATED_EVENT))) {
+ // Lookup master PID.
+ string znode = "/home/nexus/master";
+ int ret;
+
+ // Check if znode exists, if not, just return and wait.
+ ret = zk->exists(znode, true, NULL);
+
+ if (ret == ZNONODE)
+ return;
+
+ if (ret != ZOK)
+ fatal("failed to get %s! (%s)", znode.c_str(), zerror(ret));
+
+ string result;
+ ret = zk->get(znode, false, &result, NULL);
+
+ if (ret != ZOK)
+ fatal("failed to get %s! (%s)", znode.c_str(), zerror(ret));
+
+ PID master;
+
+ istringstream iss(result);
+ if (!(iss >> master))
+ fatal("bad data at %s!", znode.c_str());
+
+ // TODO(benh): HACK! Don't just set field in slave instance!
+ slave->master = master;
+
+ Process::post(slave->getPID(), S2S_GOT_MASTER);
+ } else {
+ fatal("unhandled ZooKeeper event!");
+ }
+ }
+
+ SlaveWatcher(Slave *_slave) : slave(_slave) {}
+};
+#endif
+
// Periodically sends heartbeats to the master
class Heart : public Tuple<Process>
{
@@ -63,49 +119,18 @@ public:
} /* namespace */
-Slave::Slave(const string &_master, Resources _resources, bool _local)
- : leaderDetector(NULL),
- resources(_resources), local(_local), id(""),
- isolationType("process"), isolationModule(NULL), slaveLeaderListener(this, getPID())
-{
- ftMsg = FTMessaging::getInstance();
- pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(_master);
- if (urlPair.first == UrlProcessor::ZOO) {
- isFT = true;
- zkServers = urlPair.second;
- } else {
- isFT = false;
- istringstream iss(urlPair.second);
- istringstream iss2(_master);
- if (!((iss >> master) || (iss2 >> master) )) {
- cerr << "Failed to resolve master PID " << urlPair.second << endl;
- exit(1);
- }
- }
-}
+Slave::Slave(const PID &_master, Resources _resources, bool _local)
+ : master(_master), resources(_resources), local(_local), id(-1),
+ isolationType("process"), isolationModule(NULL)
+{}
-Slave::Slave(const string &_master, Resources _resources, bool _local,
- const string &_isolationType)
- : leaderDetector(NULL),
- resources(_resources), local(_local), id(""),
- isolationType(_isolationType), isolationModule(NULL), slaveLeaderListener(this, getPID())
-{
- ftMsg = FTMessaging::getInstance();
- pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(_master);
- if (urlPair.first == UrlProcessor::ZOO) {
- isFT = true;
- zkServers = urlPair.second;
- } else {
- isFT = false;
- istringstream iss(urlPair.second);
- istringstream iss2(_master);
- if (!((iss >> master) || (iss2 >> master) )) {
- cerr << "Failed to resolve master PID " << urlPair.second << endl;
- exit(1);
- }
- }
-}
+Slave::Slave(const PID &_master, Resources _resources, bool _local,
+ const string& _isolationType)
+ : master(_master), resources(_resources), local(_local), id(-1),
+ isolationType(_isolationType), isolationModule(NULL)
+{}
+
Slave::~Slave() {}
@@ -125,7 +150,7 @@ state::SlaveState *Slave::getState()
f->executorInfo.uri, f->executorStatus, f->resources.cpus,
f->resources.mem);
state->frameworks.push_back(framework);
- foreachpair(_, TaskInfo *t, f->tasks) {
+ foreachpair(_, Task *t, f->tasks) {
state::Task *task = new state::Task(t->id, t->name, t->state,
t->resources.cpus, t->resources.mem);
framework->tasks.push_back(task);
@@ -140,21 +165,6 @@ void Slave::operator () ()
{
LOG(INFO) << "Slave started at " << self();
- if (isFT) {
- LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
- leaderDetector = new LeaderDetector(zkServers, false, "", NULL);
- leaderDetector->setListener(&slaveLeaderListener); // use this instead of constructor to avoid race condition
-
- string leaderPidStr = leaderDetector->getCurrentLeaderPID();
- string leaderSeq = leaderDetector->getCurrentLeaderSeq();
- LOG(INFO) << "Detected leader at " << leaderPidStr << " with ephemeral id:" << leaderSeq;
-
- istringstream iss(leaderPidStr);
- if (!(iss >> master)) {
- cerr << "Failed to resolve master PID " << leaderPidStr << endl;
- }
- }
-
// Get our hostname
char buf[256];
gethostname(buf, sizeof(buf));
@@ -169,12 +179,13 @@ void Slave::operator () ()
publicDns = hostname;
}
- LOG(INFO) << "Connecting to Nexus master at " << master;
- link(master);
-
- ftMsg->setMasterPid(master);
-
- send(master, pack<S2M_REGISTER_SLAVE>(hostname, publicDns, resources));
+#ifdef USING_ZOOKEEPER
+ ZooKeeper *zk;
+ if (!zookeeper.empty())
+ zk = new ZooKeeper(zookeeper, 10000, new SlaveWatcher(this));
+#else
+ send(self(), pack<S2S_GOT_MASTER>());
+#endif
FrameworkID fid;
TaskID tid;
@@ -184,7 +195,14 @@ void Slave::operator () ()
string data;
while (true) {
- switch (receive(FT_TIMEOUT)) {
+ switch (receive()) {
+ case S2S_GOT_MASTER: {
+ LOG(INFO) << "Connecting to Nexus master at " << master;
+ link(master);
+ send(master, pack<S2M_REGISTER_SLAVE>(hostname, publicDns, resources));
+ break;
+ }
+
case M2S_REGISTER_REPLY: {
unpack<M2S_REGISTER_REPLY>(this->id);
LOG(INFO) << "Registered with master; given slave ID " << this->id;
@@ -195,22 +213,11 @@ void Slave::operator () ()
break;
}
- case M2S_REREGISTER_REPLY: {
- FrameworkID tmpfid;
- unpack<M2S_REGISTER_REPLY>(tmpfid);
- LOG(INFO) << "RE-registered with master; given slave ID " << tmpfid << " had "<< this->id;
- if (this->id == "")
- this->id = tmpfid;
- link(spawn(new Heart(master, getPID(), this->id)));
- break;
- }
-
case M2S_RUN_TASK: {
- FrameworkID fid;
- string fwName, user, taskName, taskArg, fwPidStr;
+ string fwName, user, taskName, taskArg;
ExecutorInfo execInfo;
unpack<M2S_RUN_TASK>(fid, tid, fwName, user, execInfo,
- taskName, taskArg, params, fwPidStr);
+ taskName, taskArg, params);
LOG(INFO) << "Got assigned task " << fid << ":" << tid;
Resources res;
res.cpus = params.getInt32("cpus", -1);
@@ -218,15 +225,12 @@ void Slave::operator () ()
Framework *framework = getFramework(fid);
if (framework == NULL) {
// Framework not yet created on this node - create it
- PID fwPid;
- istringstream ss(fwPidStr);
- ss >> fwPid;
- framework = new Framework(fid, fwName, user, execInfo, fwPid);
+ framework = new Framework(fid, fwName, user, execInfo);
frameworks[fid] = framework;
isolationModule->frameworkAdded(framework);
isolationModule->startExecutor(framework);
}
- TaskInfo *task = framework->addTask(tid, taskName, res);
+ Task *task = framework->addTask(tid, taskName, res);
Executor *executor = getExecutor(fid);
if (executor) {
send(executor->pid,
@@ -256,24 +260,6 @@ void Slave::operator () ()
break;
}
- case M2S_FT_FRAMEWORK_MESSAGE: {
- string ftId, origPid;
- unpack<M2S_FT_FRAMEWORK_MESSAGE>(ftId, origPid, fid, message);
-
- if (!ftMsg->acceptMessageAck(ftId, origPid))
- break;
-
- DLOG(INFO) << "FT: Received message with id: " << ftId;
-
- if (Executor *ex = getExecutor(fid)) {
- send(ex->pid, pack<S2E_FRAMEWORK_MESSAGE>(message));
- }
- // TODO(matei): If executor is not started, queue framework message?
- // (It's probably okay to just drop it since frameworks can have
- // the executor send a message to the master to say when it's ready.)
- break;
- }
-
case M2S_FRAMEWORK_MESSAGE: {
unpack<M2S_FRAMEWORK_MESSAGE>(fid, message);
if (Executor *ex = getExecutor(fid)) {
@@ -333,11 +319,7 @@ void Slave::operator () ()
}
}
// Pass on the update to the master
- if (isFT) {
- string ftId = ftMsg->getNextId();
- ftMsg->reliableSend(ftId, pack<S2M_FT_STATUS_UPDATE>(ftId, self(), id, fid, tid, taskState, data));
- } else
- send(master, pack<S2M_STATUS_UPDATE>(id, fid, tid, taskState, data));
+ send(master, pack<S2M_STATUS_UPDATE>(id, fid, tid, taskState, data));
break;
}
@@ -345,14 +327,7 @@ void Slave::operator () ()
unpack<E2S_FRAMEWORK_MESSAGE>(fid, message);
// Set slave ID in case framework omitted it
message.slaveId = this->id;
- /*
- if (isFT) {
- string ftId = ftMsg->getNextId();
- ftMsg->reliableSend(ftId, pack<S2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), id, fid, message));
- } else
- send(master, pack<S2M_FRAMEWORK_MESSAGE>(id, fid, message));
- */
- send(getFramework(fid)->fwPid, pack<M2F_FRAMEWORK_MESSAGE>(message));
+ send(master, pack<S2M_FRAMEWORK_MESSAGE>(id, fid, message));
break;
}
@@ -366,16 +341,12 @@ void Slave::operator () ()
if (from() == master) {
// TODO: Fault tolerance!
- if (isFT)
- LOG(WARNING) << "FT: Master disconnected! Waiting for a new master to be elected.";
- else
- {
- LOG(ERROR) << "Master disconnected! Exiting. Consider running Nexus in FT mode!";
- if (isolationModule != NULL)
- delete isolationModule;
- // TODO: Shut down executors?
- return;
- }
+ LOG(ERROR) << "Master disconnected! Committing suicide ...";
+ // TODO(matei): Add support for factory style destroy of objects!
+ if (isolationModule != NULL)
+ delete isolationModule;
+ // TODO: Shut down executors?
+ return;
}
foreachpair (_, Executor *ex, executors) {
@@ -400,7 +371,6 @@ void Slave::operator () ()
return;
}
-
case S2S_SHUTDOWN: {
LOG(INFO) << "Asked to shut down by " << from();
// TODO(matei): Add support for factory style destroy of objects!
@@ -410,57 +380,6 @@ void Slave::operator () ()
return;
}
- case LE_NEWLEADER: {
- LOG(INFO) << "Slave got notified of new leader " << from();
- string newLeader;
- unpack<LE_NEWLEADER>(newLeader);
- istringstream iss(newLeader);
- if (!(iss >> master)) {
- cerr << "Failed to resolve master PID " << newLeader << endl;
- break;
- }
-
- LOG(INFO) << "Connecting to Nexus master at " << master;
- link(master);
-
- ftMsg->setMasterPid(master);
-
- // reconstruct resourcesInUse for the master
- // alig: do I need to include queuedTasks in this number? Don't think so.
- Resources resourcesInUse;
- vector<TaskInfo> taskVec;
-
- foreachpair(_, Framework *framework, frameworks) {
- foreachpair(_, TaskInfo *task, framework->tasks) {
- resourcesInUse += task->resources;
- TaskInfo ti = *task;
- ti.slaveId = id;
- taskVec.push_back(ti);
- }
- }
-
- if (id != "") // actual re-register
- send(master, pack<S2M_REREGISTER_SLAVE>(id, hostname, publicDns, resources, taskVec));
- else // slave started before master
- send(master, pack<S2M_REGISTER_SLAVE>(hostname, publicDns, resources));
-
- break;
- }
-
- case FT_RELAY_ACK: {
- string ftId, senderStr;
- unpack<FT_RELAY_ACK>(ftId, senderStr);
-
- DLOG(INFO) << "FT: got final ack for " << ftId;
-
- ftMsg->gotAck(ftId);
- break;
- }
-
- case PROCESS_TIMEOUT: {
- ftMsg->sendOutstanding();
- break;
- }
default: {
LOG(ERROR) << "Received unknown message ID " << msgid()
<< " from " << from();
Modified: incubator/mesos/trunk/src/third_party/libprocess/process.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/process.cpp?rev=1131711&r1=1131710&r2=1131711&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/process.cpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/process.cpp Sun Jun 5 05:00:35 2011
@@ -1234,7 +1234,7 @@ public:
}
- void receive(Process *process, time_t secs)
+ void receive(Process *process, double secs)
{
assert(process != NULL);
if (secs > 0) {
@@ -1270,7 +1270,7 @@ public:
}
}
#else
- void receive(Process *process, time_t secs)
+ void receive(Process *process, double secs)
{
//cout << "ProcessManager::receive" << endl;
assert(process != NULL);
@@ -1338,7 +1338,7 @@ public:
}
- void pause(Process *process, time_t secs)
+ void pause(Process *process, double secs)
{
assert(process != NULL);
@@ -1351,7 +1351,7 @@ public:
}
}
#else
- void pause(Process *process, time_t secs)
+ void pause(Process *process, double secs)
{
assert(process != NULL);
@@ -1883,8 +1883,10 @@ static void handle_async(struct ev_loop
{
if (update_timer) {
if (!timers->empty()) {
- timer_watcher.repeat = timers->begin()->first - ev_now(loop) > 0 ? : 0;
- if (timer_watcher.repeat == 0) {
+ timer_watcher.repeat = timers->begin()->first - ev_now(loop);
+ /* If timer has elapsed feed the event immediately. */
+ if (timer_watcher.repeat <= 0) {
+ timer_watcher.repeat = 0;
ev_feed_event(loop, &timer_watcher, EV_TIMEOUT);
} else {
ev_timer_again(loop, &timer_watcher);
@@ -2961,7 +2963,7 @@ void Process::send(const PID &to, MSGID
}
-MSGID Process::receive(time_t secs)
+MSGID Process::receive(double secs)
{
//cout << "Process::receive(" << secs << ")" << endl;
/* Free current message. */
@@ -3036,7 +3038,7 @@ MSGID Process::receive(time_t secs)
MSGID Process::call(const PID &to, MSGID id,
- const char *data, size_t length, time_t secs)
+ const char *data, size_t length, double secs)
{
send(to, id, data, length);
return receive(secs);
@@ -3057,7 +3059,7 @@ const char * Process::body(size_t *lengt
}
-void Process::pause(time_t secs)
+void Process::pause(double secs)
{
#ifdef USE_LITHE
/* TODO(benh): Handle non-libprocess task/ctx (i.e., proc_thread below). */
Modified: incubator/mesos/trunk/src/third_party/libprocess/process.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/process.hpp?rev=1131711&r1=1131710&r2=1131711&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/process.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/process.hpp Sun Jun 5 05:00:35 2011
@@ -177,7 +177,7 @@ protected:
MSGID receive();
/* Blocks for message at most specified seconds. */
- MSGID receive(time_t);
+ MSGID receive(double sec);
/* Sends a message to PID and then blocks for a message indefinitely. */
MSGID call(const PID &, MSGID);
@@ -186,13 +186,13 @@ protected:
MSGID call(const PID &, MSGID, const char *data, size_t length);
/* Sends, and then blocks for a message at most specified seconds. */
- MSGID call(const PID &, MSGID, const char *data, size_t length, time_t);
+ MSGID call(const PID &, MSGID, const char *data, size_t length, double secs);
/* Returns pointer and length of body of last dequeued (current) message. */
const char * body(size_t *length);
/* Blocks at least specified seconds (may block longer). */
- void pause(time_t);
+ void pause(double secs);
/* Links with the specified PID. */
PID link(const PID &);
Modified: incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp?rev=1131711&r1=1131710&r2=1131711&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp Sun Jun 5 05:00:35 2011
@@ -583,7 +583,7 @@ protected:
return receive(0);
}
- MSGID receive(time_t secs)
+ MSGID receive(double secs)
{
return Process::receive(secs);
}
@@ -613,7 +613,7 @@ protected:
}
template <MSGID ID>
- MSGID call(const PID &to, const tuple<ID> &r, time_t secs)
+ MSGID call(const PID &to, const tuple<ID> &r, double secs)
{
send(to, r);
return receive(secs);
@@ -740,13 +740,13 @@ protected:
}
template <MSGID ID>
- MSGID call(const PID &to, time_t secs)
+ MSGID call(const PID &to, double secs)
{
return call(to, pack<ID>(), secs);
}
template <MSGID ID>
- MSGID call(const PID &to, typename field<0, ID>::type t0, time_t secs)
+ MSGID call(const PID &to, typename field<0, ID>::type t0, double secs)
{
return call(to, pack<ID>(t0), secs);
}
@@ -755,7 +755,7 @@ protected:
MSGID call(const PID &to,
typename field<0, ID>::type t0,
typename field<1, ID>::type t1,
- time_t secs)
+ double secs)
{
return call(to, pack<ID>(t0, t1), secs);
}
@@ -765,7 +765,7 @@ protected:
typename field<0, ID>::type t0,
typename field<1, ID>::type t1,
typename field<2, ID>::type t2,
- time_t secs)
+ double secs)
{
return call(to, pack<ID>(t0, t1, t2), secs);
}
@@ -776,7 +776,7 @@ protected:
typename field<1, ID>::type t1,
typename field<2, ID>::type t2,
typename field<3, ID>::type t3,
- time_t secs)
+ double secs)
{
return call(to, pack<ID>(t0, t1, t2, t3), secs);
}
@@ -788,7 +788,7 @@ protected:
typename field<2, ID>::type t2,
typename field<3, ID>::type t3,
typename field<4, ID>::type t4,
- time_t secs)
+ double secs)
{
return call(to, pack<ID>(t0, t1, t2, t3, t4), secs);
}
@@ -801,7 +801,7 @@ protected:
typename field<3, ID>::type t3,
typename field<4, ID>::type t4,
typename field<5, ID>::type t5,
- time_t secs)
+ double secs)
{
return call(to, pack<ID>(t0, t1, t2, t3, t4, t5), secs);
}
@@ -815,7 +815,7 @@ protected:
typename field<4, ID>::type t4,
typename field<5, ID>::type t5,
typename field<6, ID>::type t6,
- time_t secs)
+ double secs)
{
return call(to, pack<ID>(t0, t1, t2, t3, t4, t5, t6), secs);
}
@@ -830,7 +830,7 @@ protected:
typename field<5, ID>::type t5,
typename field<6, ID>::type t6,
typename field<7, ID>::type t7,
- time_t secs)
+ double secs)
{
return call(to, pack<ID>(t0, t1, t2, t3, t4, t5, t6, t7), secs);
}
@@ -846,7 +846,7 @@ protected:
typename field<6, ID>::type t6,
typename field<7, ID>::type t7,
typename field<8, ID>::type t8,
- time_t secs)
+ double secs)
{
return call(to, pack<ID>(t0, t1, t2, t3, t4, t5, t6, t7, t8), secs);
}
@@ -863,7 +863,7 @@ protected:
typename field<7, ID>::type t7,
typename field<8, ID>::type t8,
typename field<9, ID>::type t9,
- time_t secs)
+ double secs)
{
return call(to, pack<ID>(t0, t1, t2, t3, t4, t5, t6, t7, t8, t9), secs);
}
Modified: incubator/mesos/trunk/src/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper.cpp?rev=1131711&r1=1131710&r2=1131711&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper.cpp (original)
+++ incubator/mesos/trunk/src/zookeeper.cpp Sun Jun 5 05:00:35 2011
@@ -41,6 +41,7 @@ enum {
REMOVE, // Perform an asynchronous remove (delete).
EXISTS, // Perform an asysnchronous exists.
GET, // Perform an asynchronous get.
+ SET, // Perform an asynchronous set.
};
@@ -104,6 +105,18 @@ struct GetCall
};
+/* Set "message" for performing ZooKeeper::set. */
+struct SetCall
+{
+ int ret;
+ const string *path;
+ const string *data;
+ int version;
+ PID from;
+ ZooKeeperProcess *zooKeeperProcess;
+};
+
+
/* PID of singleton instance of WatcherProcessManager. */
PID manager;
@@ -333,12 +346,26 @@ private:
getCall->zooKeeperProcess->send(getCall->from, COMPLETED);
}
+ static void setCompletion(int ret, const Stat *stat, const void *data)
+ {
+ SetCall *setCall =
+ static_cast<SetCall *>(const_cast<void *>((data)));
+ setCall->ret = ret;
+ setCall->zooKeeperProcess->send(setCall->from, COMPLETED);
+ }
+
void prepare(int *fd, int *ops, timeval *tv)
{
int interest = 0;
int ret = zookeeper_interest(zh, fd, &interest, tv);
+ // If in some disconnected state, try again later.
+ if (ret == ZINVALIDSTATE ||
+ ret == ZCONNECTIONLOSS ||
+ ret == ZOPERATIONTIMEOUT)
+ return;
+
if (ret != ZOK)
fatal("zookeeper_interest failed! (%s)", zerror(ret));
@@ -365,6 +392,11 @@ private:
int ret = zookeeper_process(zh, events);
+ // If in some disconnected state, try again later.
+ if (ret == ZINVALIDSTATE ||
+ ret == ZCONNECTIONLOSS)
+ return;
+
if (ret != ZOK && ret != ZNOTHING)
fatal("zookeeper_process failed! (%s)", zerror(ret));
}
@@ -392,7 +424,10 @@ protected:
// TODO(benh): If tv is 0, invoke await and ignore queued messages?
- if (!await(fd, ops, tv, false)) {
+ if (await(fd, ops, tv, false)) {
+ // No enqueued messages and either data available on fd or timer expired.
+ process(fd, ops);
+ } else {
// TODO(benh): Don't handle incoming "calls" until we are connected!
switch (receive()) {
case CREATE: {
@@ -452,6 +487,22 @@ protected:
}
break;
}
+ case SET: {
+ SetCall *setCall =
+ *reinterpret_cast<SetCall **>(const_cast<char *>(body(NULL)));
+ setCall->from = from();
+ setCall->zooKeeperProcess = this;
+ int ret = zoo_aset(zh, setCall->path->c_str(),
+ setCall->data->data(),
+ setCall->data->size(),
+ setCall->version,
+ setCompletion, setCall);
+ if (ret != ZOK) {
+ setCall->ret = ret;
+ send(setCall->from, COMPLETED);
+ }
+ break;
+ }
case TERMINATE: {
return;
}
@@ -459,8 +510,6 @@ protected:
fatal("unexpected interruption during await");
}
}
- } else {
- process(fd, ops);
}
}
}
@@ -673,6 +722,46 @@ int ZooKeeper::get(const string &path,
}
+int ZooKeeper::set(const string &path,
+ const string &data,
+ int version)
+{
+ SetCall setCall;
+ setCall.path = &path;
+ setCall.data = &data;
+ setCall.version = version;
+
+ class SetCallProcess : public Process
+ {
+ private:
+ ZooKeeperProcess *zooKeeperProcess;
+ SetCall *setCall;
+
+ protected:
+ void operator () ()
+ {
+ if (call(zooKeeperProcess->getPID(),
+ SET,
+ reinterpret_cast<char *>(&setCall),
+ sizeof(SetCall *)) != COMPLETED)
+ setCall->ret = ZSYSTEMERROR;
+ }
+
+ public:
+ SetCallProcess(ZooKeeperProcess *_zooKeeperProcess,
+ SetCall *_setCall)
+ : zooKeeperProcess(_zooKeeperProcess),
+ setCall(_setCall)
+ {}
+ } setCallProcess(static_cast<ZooKeeperProcess *>(impl),
+ &setCall);
+
+ Process::wait(Process::spawn(&setCallProcess));
+
+ return setCall.ret;
+}
+
+
const char * ZooKeeper::error(int ret) const
{
return zerror(ret);
Modified: incubator/mesos/trunk/src/zookeeper.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/zookeeper.hpp?rev=1131711&r1=1131710&r2=1131711&view=diff
==============================================================================
--- incubator/mesos/trunk/src/zookeeper.hpp (original)
+++ incubator/mesos/trunk/src/zookeeper.hpp Sun Jun 5 05:00:35 2011
@@ -230,6 +230,26 @@ public:
Stat *stat);
/**
+ * \brief sets the data associated with a node.
+ *
+ * \param path the name of the node. Expressed as a file name with slashes
+ * separating ancestors of the node.
+ * \param data the data to be written to the node.
+ * \param version the expected version of the node. The function will fail if
+ * the actual version of the node does not match the expected version. If -1 is
+ * used the version check will not take place.
+ * \return the return code for the function call.
+ * ZOK operation completed succesfully
+ * ZNONODE the node does not exist.
+ * ZNOAUTH the client does not have permission.
+ * ZBADVERSION expected version does not match actual version.
+ * ZBADARGUMENTS - invalid input parameters
+ * ZINVALIDSTATE - zhandle state is either ZOO_SESSION_EXPIRED_STATE or ZOO_AUTH_FAILED_STATE
+ * ZMARSHALLINGERROR - failed to marshall a request; possibly, out of memory
+ */
+ int set(const std::string &path, const std::string &data, int version);
+
+ /**
* \brief return a string describing the last error.
*
* \return string corresponding to the last error