You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:08:03 UTC
svn commit: r1132253 [1/5] - in /incubator/mesos/trunk: src/ src/detector/
src/exec/ src/master/ src/messaging/ src/sched/ src/slave/ src/tests/
third_party/libprocess/
third_party/libprocess/third_party/ry-http-parser-1c3624a/
Author: benh
Date: Sun Jun 5 09:08:02 2011
New Revision: 1132253
URL: http://svn.apache.org/viewvc?rev=1132253&view=rev
Log:
This is a rather large, rambling, commit. My apologies, one thing led
to the next. This commit adds a host of new features, most of them
instigated from libproces.
Libprocess:
* Changed libprocess message IDs to message names (strings).
* Changed the internal protocol used by libprocess to HTTP.
* Added support for sending data directly to a browser (Proxy).
Mesos:
* Updated code to use new message names.
* Added "handler" routines so that messages could be handled as
function callbacks (more like RPC).
* Added an example in the master of how to get an HTTP message and
respond.
Added:
incubator/mesos/trunk/src/messaging/messages.cpp
incubator/mesos/trunk/third_party/libprocess/decoder.hpp
incubator/mesos/trunk/third_party/libprocess/encoder.hpp
incubator/mesos/trunk/third_party/libprocess/third_party/ry-http-parser-1c3624a/
incubator/mesos/trunk/third_party/libprocess/third_party/ry-http-parser-1c3624a/.gitignore
incubator/mesos/trunk/third_party/libprocess/third_party/ry-http-parser-1c3624a/CONTRIBUTIONS
incubator/mesos/trunk/third_party/libprocess/third_party/ry-http-parser-1c3624a/LICENSE-MIT
incubator/mesos/trunk/third_party/libprocess/third_party/ry-http-parser-1c3624a/README.md
incubator/mesos/trunk/third_party/libprocess/third_party/ry-http-parser-1c3624a/http_parser.c
incubator/mesos/trunk/third_party/libprocess/third_party/ry-http-parser-1c3624a/http_parser.h
incubator/mesos/trunk/third_party/libprocess/third_party/ry-http-parser-1c3624a/test.c
incubator/mesos/trunk/third_party/libprocess/tokenize.cpp
incubator/mesos/trunk/third_party/libprocess/tokenize.hpp
Modified:
incubator/mesos/trunk/src/Makefile.in
incubator/mesos/trunk/src/detector/detector.cpp
incubator/mesos/trunk/src/detector/zookeeper.cpp
incubator/mesos/trunk/src/exec/exec.cpp
incubator/mesos/trunk/src/master/master.cpp
incubator/mesos/trunk/src/master/master.hpp
incubator/mesos/trunk/src/master/webui.cpp
incubator/mesos/trunk/src/messaging/messages.hpp
incubator/mesos/trunk/src/messaging/messages.proto
incubator/mesos/trunk/src/sched/sched.cpp
incubator/mesos/trunk/src/slave/lxc_isolation_module.cpp
incubator/mesos/trunk/src/slave/lxc_isolation_module.hpp
incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp
incubator/mesos/trunk/src/slave/process_based_isolation_module.hpp
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/slave/slave.hpp
incubator/mesos/trunk/src/slave/webui.cpp
incubator/mesos/trunk/src/tests/master_test.cpp
incubator/mesos/trunk/src/tests/utils.hpp
incubator/mesos/trunk/third_party/libprocess/Makefile.in
incubator/mesos/trunk/third_party/libprocess/pid.cpp
incubator/mesos/trunk/third_party/libprocess/pid.hpp
incubator/mesos/trunk/third_party/libprocess/process.cpp
incubator/mesos/trunk/third_party/libprocess/process.hpp
Modified: incubator/mesos/trunk/src/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/Makefile.in?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/Makefile.in (original)
+++ incubator/mesos/trunk/src/Makefile.in Sun Jun 5 09:08:02 2011
@@ -134,7 +134,8 @@ SWIG_WEBUI_OBJ = $(MASTER_SWIG_WEBUI_OBJ
COMMON_OBJ = common/fatal.o common/lock.o detector/detector.o \
detector/url_processor.o configurator/configurator.o \
common/string_utils.o common/logging.o \
- common/date_utils.o
+ common/date_utils.o common/tokenize.o common/resources.o \
+ messaging/messages.o
ifeq ($(WITH_ZOOKEEPER),1)
COMMON_OBJ += detector/zookeeper.o
@@ -204,6 +205,14 @@ MESOS_JAVA_JAR = $(LIBDIR)/java/mesos.ja
MESOS_PYTHON_LIB = $(LIBDIR)/python/_mesos.so
+MESOS_PYTHON_LIB_OBJ = python/native/module.o \
+ python/native/proxy_scheduler.o \
+ python/native/mesos_scheduler_driver_impl.o \
+ python/native/proxy_executor.o \
+ python/native/mesos_executor_driver_impl.o
+
+MESOS_PYTHON_FILE = $(LIBDIR)/python/mesos.py
+
# We copy all the webui files into the bin directory.
WEBUI_FILES = $(BINDIR)/webui/bottle-0.8.3 \
$(BINDIR)/webui/common/webui_lib.py \
@@ -326,7 +335,7 @@ $(MESOS_PROJD_EXE): $(SRCDIR)/slave/proj
java: $(MESOS_JAVA_LIB) $(MESOS_JAVA_JAR)
-$(MESOS_JAVA_JAR): $(SRCDIR)/java/src/mesos/*.java | $(LIBDIR)/java
+$(MESOS_JAVA_JAR): $(SRCDIR)/java/src/mesos/*.java @top_srcdir@/include/mesos.proto | $(LIBDIR)/java
ifdef JAVA_HOME
mkdir -p @top_builddir@/$(PROTOBUF)/java/src/main/java
$(PROTOC) --java_out=@top_builddir@/$(PROTOBUF)/java/src/main/java -I@top_srcdir@/$(PROTOBUF)/src @top_srcdir@/$(PROTOBUF)/src/google/protobuf/descriptor.proto
@@ -341,21 +350,32 @@ ifdef JAVA_HOME
endif
$(MESOS_JAVA_LIB_OBJ): %.o: $(SRCDIR)/%.cpp $(MESOS_JAVA_JAR)
- $(CXX) -c $(CXXFLAGS) -Ijava/jni -I$(JAVA_HEADERS) -I$(JAVA_HEADERS)/$(OS_NAME) -o $@ $<
+ $(CXX) -c $(CXXFLAGS) -Ijava/jni -I$(JAVA_HOME)/include -I$(JAVA_HOME)/include/$(OS_NAME) -I $(JAVA_HEADERS) -o $@ $<
$(MESOS_JAVA_LIB): $(MESOS_JAVA_LIB_OBJ) $(MESOS_SCHED_LIB) $(MESOS_EXEC_LIB) | $(LIBDIR)/java
ifdef JAVA_HOME
$(CXX) $(CXXFLAGS) -shared -o $@ $(MESOS_JAVA_LIB_OBJ) $(MESOS_SCHED_LIB) $(MESOS_EXEC_LIB) $(LDFLAGS) $(LIBS)
endif
-python: $(MESOS_PYTHON_LIB)
+python: $(MESOS_PYTHON_LIB) $(MESOS_PYTHON_FILE) $(MESOS_PYTHON_PROTOBUFS)
+
+$(MESOS_PYTHON_LIB_OBJ): %.o: $(SRCDIR)/%.cpp
+ifdef PYTHON_HEADERS
+ mkdir -p python/native
+ $(CXX) -c $(CXXFLAGS) -Ipython/native -I$(PYTHON_HEADERS) -o $@ $<
+endif
+
+$(MESOS_PYTHON_LIB): $(MESOS_PYTHON_LIB_OBJ) $(MESOS_SCHED_LIB) $(MESOS_EXEC_LIB) | $(LIBDIR)/python
+ifdef PYTHON_HEADERS
+ $(CXX) $(CXXFLAGS) -shared -o $@ $(MESOS_PYTHON_LIB_OBJ) $(MESOS_SCHED_LIB) $(MESOS_EXEC_LIB) $(LDFLAGS) $(PYTHON_LDFLAGS) $(LIBS)
+endif
-$(MESOS_PYTHON_LIB): $(SRCDIR)/swig/mesos.i $(MESOS_SCHED_LIB) $(MESOS_EXEC_LIB) | $(LIBDIR)/python
+$(MESOS_PYTHON_FILE): $(SRCDIR)/python/src/mesos.py @top_srcdir@/include/mesos.proto | $(LIBDIR)/python
ifdef PYTHON_HEADERS
- mkdir -p swig/python
- $(SWIG) -c++ -python -threads -I@top_srcdir@/include -o swig/python/mesos_wrap.cpp -outdir swig/python $(SRCDIR)/swig/mesos.i
- $(CXX) $(CXXFLAGS) -I$(PYTHON_HEADERS) -shared -o $@ swig/python/mesos_wrap.cpp $(MESOS_SCHED_LIB) $(MESOS_EXEC_LIB) $(LDFLAGS) $(PYTHON_LDFLAGS) $(LIBS)
- cp -r swig/python/mesos.py $(LIBDIR)/python/mesos.py
+ cp -r @top_srcdir@/$(PROTOBUF)/python @top_builddir@/$(PROTOBUF)/python
+ cp $< $@
+ $(PROTOC) --python_out=@top_builddir@/$(PROTOBUF)/python -I@top_srcdir@/$(PROTOBUF)/src @top_srcdir@/$(PROTOBUF)/src/google/protobuf/descriptor.proto
+ $(PROTOC) --python_out=$(LIBDIR)/python -I@top_srcdir@/include @top_srcdir@/include/mesos.proto
endif
$(WEBUI_FILES): $(BINDIR)/%: $(SRCDIR)/% | $(WEBUI_DIRECTORIES)
@@ -375,8 +395,7 @@ $(CONFDIR)/deploy-env.sh: | $(SRCDIR)/co
test: all
$(MAKE) -C tests test
-#all: $(MESOS_LIBS) $(MESOS_EXES) java python $(WEBUI_FILES) $(CONF_FILES) $(DEPLOY_FILES)
-all: $(MESOS_LIBS) $(MESOS_EXES) java $(WEBUI_FILES) $(CONF_FILES) $(DEPLOY_FILES)
+all: $(MESOS_LIBS) $(MESOS_EXES) java python $(WEBUI_FILES) $(CONF_FILES) $(DEPLOY_FILES)
$(MAKE) -C examples
$(MAKE) -C tests
Modified: incubator/mesos/trunk/src/detector/detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/detector/detector.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/detector/detector.cpp (original)
+++ incubator/mesos/trunk/src/detector/detector.cpp Sun Jun 5 09:08:02 2011
@@ -42,17 +42,11 @@ public:
protected:
virtual void operator () ()
{
- switch (receive(120)) {
- case PROCESS_TIMEOUT: {
- LOG(ERROR) << "Have not heard back from ZooKeeper after trying to "
- << "(automagically) reconnect";
- MesosProcess::post(pid, MASTER_DETECTION_FAILURE);
- break;
- }
-
- default: {
- break;
- }
+ receive(120);
+ if (name() == TIMEOUT) {
+ LOG(ERROR) << "Have not heard back from ZooKeeper after trying to "
+ << "(automagically) reconnect";
+ MesosProcess::post(pid, MASTER_DETECTION_FAILURE);
}
}
@@ -217,14 +211,14 @@ BasicMasterDetector::BasicMasterDetector
{
// Send a master token.
{
- Message<GOT_MASTER_TOKEN> msg;
+ MSG<GOT_MASTER_TOKEN> msg;
msg.set_token("0");
MesosProcess::post(master, msg);
}
// Elect the master.
{
- Message<NEW_MASTER_DETECTED> msg;
+ MSG<NEW_MASTER_DETECTED> msg;
msg.set_pid(master);
MesosProcess::post(master, msg);
}
@@ -239,21 +233,21 @@ BasicMasterDetector::BasicMasterDetector
if (elect) {
// Send a master token.
{
- Message<GOT_MASTER_TOKEN> msg;
+ MSG<GOT_MASTER_TOKEN> msg;
msg.set_token("0");
MesosProcess::post(master, msg);
}
// Elect the master.
{
- Message<NEW_MASTER_DETECTED> msg;
+ MSG<NEW_MASTER_DETECTED> msg;
msg.set_pid(master);
MesosProcess::post(master, msg);
}
}
// Tell the pid about the master.
- Message<NEW_MASTER_DETECTED> msg;
+ MSG<NEW_MASTER_DETECTED> msg;
msg.set_pid(master);
MesosProcess::post(pid, msg);
}
@@ -267,14 +261,14 @@ BasicMasterDetector::BasicMasterDetector
if (elect) {
// Send a master token.
{
- Message<GOT_MASTER_TOKEN> msg;
+ MSG<GOT_MASTER_TOKEN> msg;
msg.set_token("0");
MesosProcess::post(master, msg);
}
// Elect the master.
{
- Message<NEW_MASTER_DETECTED> msg;
+ MSG<NEW_MASTER_DETECTED> msg;
msg.set_pid(master);
MesosProcess::post(master, msg);
}
@@ -282,7 +276,7 @@ BasicMasterDetector::BasicMasterDetector
// Tell each pid about the master.
foreach (const PID &pid, pids) {
- Message<NEW_MASTER_DETECTED> msg;
+ MSG<NEW_MASTER_DETECTED> msg;
msg.set_pid(master);
MesosProcess::post(pid, msg);
}
@@ -326,7 +320,7 @@ ZooKeeperMasterDetector::~ZooKeeperMaste
{
// Kill the timer (if running), and then the actual ZooKeeper instance.
if (timer != NULL) {
- Process::post(timer->self(), PROCESS_MSGID);
+ Process::post(timer->self(), TERMINATE);
Process::wait(timer->self());
delete timer;
timer = NULL;
@@ -390,7 +384,7 @@ void ZooKeeperMasterDetector::process(Zo
setId(result);
LOG(INFO) << "Created ephemeral/sequence:" << getId();
- Message<GOT_MASTER_TOKEN> msg;
+ MSG<GOT_MASTER_TOKEN> msg;
msg.set_token(getId());
MesosProcess::post(pid, msg);
}
@@ -403,7 +397,7 @@ void ZooKeeperMasterDetector::process(Zo
// Kill the reconnect timer.
if (timer != NULL) {
- Process::post(timer->self(), PROCESS_MSGID);
+ Process::post(timer->self(), TERMINATE);
Process::wait(timer->self());
delete timer;
timer = NULL;
@@ -510,7 +504,7 @@ void ZooKeeperMasterDetector::detectMast
if (currentMasterPID == PID()) {
MesosProcess::post(pid, NO_MASTER_DETECTED);
} else {
- Message<NEW_MASTER_DETECTED> msg;
+ MSG<NEW_MASTER_DETECTED> msg;
msg.set_pid(currentMasterPID);
MesosProcess::post(pid, msg);
}
Modified: incubator/mesos/trunk/src/detector/zookeeper.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/detector/zookeeper.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/detector/zookeeper.cpp (original)
+++ incubator/mesos/trunk/src/detector/zookeeper.cpp Sun Jun 5 09:08:02 2011
@@ -5,293 +5,150 @@
#include <process.hpp>
+#include <boost/tuple/tuple.hpp>
+
#include "zookeeper.hpp"
#include "common/fatal.hpp"
+using boost::cref;
+using boost::tuple;
+
+using std::cerr;
+using std::cout;
+using std::endl;
using std::map;
using std::string;
using std::vector;
-/* Forward (and first) declaration of ZooKeeperProcess. */
-class ZooKeeperProcess;
-
-
-enum {
- /* Generic messages: */
- TERMINATE = PROCESS_MSGID, // Terminate process.
-
- /* WatcherProcessManager messages: */
- OK, // Generic success response.
- ERROR, // Generic failure response.
- REGISTER, // Register WatcherProcess.
- UNREGISTER, // Unregister WatcherProcess.
- LOOKUP_PROCESS, // Lookup WatcherProcess associated with Watcher.
- LOOKUP_PID, // Lookup WatcherProcess PID associated with Watcher.
-
- /* WatcherProcess messages: */
- EVENT, // Invoke Watcher::process callback.
-
- /* ZooKeeperProcess messages: */
- COMPLETED, // After an asynchronous "call" has completed.
- CREATE, // Perform an asynchronous create.
- REMOVE, // Perform an asynchronous remove (delete).
- EXISTS, // Perform an asysnchronous exists.
- GET, // Perform an asynchronous get.
- GET_CHILDREN, // Perform an asynchronous get_children.
- SET, // Perform an asynchronous set.
-};
-
+/* Singleton instance of WatcherProcessManager. */
+class WatcherProcessManager;
-/* Event "message" for invoking Watcher. */
-struct Event
-{
- ZooKeeper *zk;
- int type;
- int state;
- string path;
-};
-
-
-/* Create "message" for performing ZooKeeper::create. */
-struct CreateCall
-{
- int ret;
- const string *path;
- const string *data;
- const ACL_vector *acl;
- int flags;
- string *result;
- PID from;
- ZooKeeperProcess *zooKeeperProcess;
-};
-
-
-/* Remove "message" for performing ZooKeeper::remove. */
-struct RemoveCall
-{
- int ret;
- const string *path;
- int version;
- PID from;
- ZooKeeperProcess *zooKeeperProcess;
-};
-
-
-/* Exists "message" for performing ZooKeeper::exists. */
-struct ExistsCall
-{
- int ret;
- const string *path;
- bool watch;
- Stat *stat;
- PID from;
- ZooKeeperProcess *zooKeeperProcess;
-};
-
-
-/* Get "message" for performing ZooKeeper::get. */
-struct GetCall
-{
- int ret;
- const string *path;
- bool watch;
- string *result;
- Stat *stat;
- PID from;
- ZooKeeperProcess *zooKeeperProcess;
-};
-
-
-/* GetChildren "message" for performing ZooKeeper::getChildren. */
-struct GetChildrenCall
-{
- int ret;
- const string *path;
- bool watch;
- vector<string> *results;
- PID from;
- ZooKeeperProcess *zooKeeperProcess;
-};
-
-
-/* Set "message" for performing ZooKeeper::set. */
-struct SetCall
-{
- int ret;
- const string *path;
- const string *data;
- int version;
- PID from;
- ZooKeeperProcess *zooKeeperProcess;
-};
-
-
-/* PID of singleton instance of WatcherProcessManager. */
-PID manager;
+WatcherProcessManager *manager;
/**
* In order to make callbacks on Watcher, we create a proxy
- * WatcherProcess. The ZooKeeperProcess (defined below) stores the PID
- * of the WatcherProcess associated with a watcher and sends it an
- * event "message" which it uses to invoke Watcher::process. Care
- * needed to be taken to assure that a WatcherProcess was only valid
- * as long as a Watcher was valid. This was done by ensuring that the
- * WatcherProcess object created gets cleaned up in ~Watcher(). We
- * wanted to keep the Watcher interface clean and simple, so rather
- * than add a member in Watcher that points to a WatcherProcess
- * instance (or points to a WatcherImpl), we choose to create a
- * WatcherProcessManager that stores the Watcher and WatcherProcess
- * associations. The WatcherProcessManager is akin to having a shared
- * dictionary or hashtable and using locks to access it rathe then
- * sending and receiving messages. Their is probably a performance hit
- * here, but it would be interesting to see how bad the perforamnce is
- * across a range of low and high-contention states.
+ * WatcherProcess. The ZooKeeperProcess (defined below) dispatches
+ * "events" to the WatcherProcess which then invokes
+ * Watcher::process. Care needed to be taken to assure that a
+ * WatcherProcess was only valid as long as a Watcher was valid. This
+ * was done by ensuring that the WatcherProcess object created gets
+ * "terminated" in ~Watcher(). A pointer to the WatcherProcess might
+ * still be valid in ZooKeeperProcess (see below), but any messages
+ * sent to it will get dropped because it is no longer running. The
+ * ZooKeeperProcess is responsible for actually deleting the
+ * WatcherProcess. We wanted to keep the Watcher interface clean and
+ * simple, so rather than add a member in Watcher that points to a
+ * WatcherProcess instance (or points to a WatcherImpl), we choose to
+ * create a WatcherProcessManager that stores the Watcher and
+ * WatcherProcess associations. The WatcherProcessManager is akin to
+ * having a shared dictionary or hashtable and using locks to access
+ * it rather then sending and receiving messages. Their is probably a
+ * performance hit here, but it would be interesting to see how bad
+ * the perforamnce is across a range of low and high-contention
+ * states.
*/
class WatcherProcess : public Process
{
- friend class WatcherProcessManager;
+public:
+ WatcherProcess(Watcher *_watcher) : watcher(_watcher) {}
-private:
- Watcher *watcher;
+ void event(ZooKeeper *zk, int type, int state, const char *path)
+ {
+ watcher->process(zk, type, state, path);
+ }
protected:
- void operator () ()
+ virtual void operator () ()
{
- WatcherProcess *process = this;
- send(manager, REGISTER,
- reinterpret_cast<char *>(&process), sizeof(process));
- if (receive() != OK)
- fatal("failed to setup underlying watcher mechanism");
- while (true) {
- switch (receive()) {
- case EVENT: {
- Event *event =
- *reinterpret_cast<Event **>(const_cast<char *>(body(NULL)));
- watcher->process(event->zk, event->type, event->state, event->path);
- delete event;
- break;
- }
- case TERMINATE:
- send(manager, UNREGISTER,
- reinterpret_cast<char *>(&process), sizeof(process));
- if (receive() != OK)
- fatal("failed to cleanup underlying watcher mechanism");
- return;
- }
- }
+ do serve();
+ while (name() != TERMINATE);
}
-public:
- WatcherProcess(Watcher *_watcher) : watcher(_watcher) {}
+private:
+ Watcher *watcher;
};
class WatcherProcessManager : public Process
{
-private:
- map<Watcher *, WatcherProcess *> watchers;
+public:
+ Result<WatcherProcess *> create(Watcher *watcher)
+ {
+ WatcherProcess *process = new WatcherProcess(watcher);
+ spawn(process);
+ processes[watcher] = process;
+ return process;
+ }
-protected:
- void operator () ()
+ void destroy(Watcher *watcher)
{
- while (true) {
- switch (receive()) {
- case REGISTER: {
- WatcherProcess *process =
- *reinterpret_cast<WatcherProcess **>(const_cast<char *>(body(NULL)));
- Watcher *watcher = process->watcher;
- assert(watchers.find(watcher) == watchers.end());
- watchers[watcher] = process;
- send(from(), OK);
- break;
- }
- case UNREGISTER: {
- WatcherProcess *process =
- *reinterpret_cast<WatcherProcess **>(const_cast<char *>(body(NULL)));
- Watcher *watcher = process->watcher;
- assert(watchers.find(watcher) != watchers.end());
- watchers.erase(watcher);
- send(from(), OK);
- break;
- }
- case LOOKUP_PROCESS: {
- Watcher *watcher =
- *reinterpret_cast<Watcher **>(const_cast<char *>(body(NULL)));
- if (watchers.find(watcher) != watchers.end()) {
- WatcherProcess *process = watchers[watcher];
- send(from(), OK, reinterpret_cast<char *>(&process), sizeof(process));
- } else {
- send(from(), ERROR);
- }
- break;
- }
- case LOOKUP_PID: {
- Watcher *watcher =
- *reinterpret_cast<Watcher **>(const_cast<char *>(body(NULL)));
- if (watchers.find(watcher) != watchers.end()) {
- WatcherProcess *process = watchers[watcher];
- const PID &pid = process->self();
- send(from(), OK, reinterpret_cast<const char *>(&pid), sizeof(pid));
- } else {
- send(from(), ERROR);
- }
- break;
- }
- }
+ if (processes.count(watcher) > 0) {
+ WatcherProcess *process = processes[watcher];
+ processes.erase(watcher);
+ send(process->self(), TERMINATE);
+ wait(process->self());
+ delete process;
+ }
+ }
+
+ Result<bool> terminate(Watcher *watcher)
+ {
+ if (processes.count(watcher) > 0) {
+ WatcherProcess *process = processes[watcher];
+ send(process->self(), TERMINATE);
+ wait(process->self());
+ return true;
+ }
+
+ return false;
+ }
+
+ Result<WatcherProcess *> lookup(Watcher *watcher)
+ {
+ if (processes.count(watcher) > 0) {
+ return processes[watcher];
+ } else {
+ return NULL;
}
}
+
+private:
+ map<Watcher *, WatcherProcess *> processes;
};
Watcher::Watcher()
{
- // Confirm we have allocated the WatcherProcessManager.
+ // Confirm we have created the WatcherProcessManager.
static volatile bool initialized = false;
static volatile bool initializing = true;
// Confirm everything is initialized.
if (!initialized) {
if (__sync_bool_compare_and_swap(&initialized, false, true)) {
- manager = Process::spawn(new WatcherProcessManager());
- initializing = false;
- }
+ manager = new WatcherProcessManager();
+ Process::spawn(manager);
+ initializing = false;
}
+ }
while (initializing);
- Process::spawn(new WatcherProcess(this));
+ WatcherProcess *process =
+ Process::call(manager, &WatcherProcessManager::create, this);
+
+ if (process == NULL) {
+ fatal("failed to initialize Watcher");
+ }
}
Watcher::~Watcher()
{
- class WatcherProcessWaiter : public Process
- {
- private:
- Watcher *watcher;
-
- protected:
- void operator () ()
- {
- send(manager, LOOKUP_PROCESS,
- reinterpret_cast<char *>(&watcher), sizeof(watcher));
- if (receive() != OK)
- fatal("failed to deallocate resources associated with Watcher");
- WatcherProcess *process =
- *reinterpret_cast<WatcherProcess **>(const_cast<char *>(body(NULL)));
- send(process->self(), TERMINATE);
- wait(process->self());
- delete process;
- }
-
- public:
- WatcherProcessWaiter(Watcher *_watcher) : watcher(_watcher) {}
- } watcherProcessWaiter(this);
-
- Process::wait(Process::spawn(&watcherProcessWaiter));
+ Process::call(manager, &WatcherProcessManager::terminate, this);
}
@@ -300,159 +157,158 @@ class ZooKeeperImpl : public Process {};
class ZooKeeperProcess : public ZooKeeperImpl
{
- friend class ZooKeeper;
-
-private:
- ZooKeeper *zk; // ZooKeeper instance.
- string hosts; // ZooKeeper host:port pairs.
- int timeout; // ZooKeeper session timeout.
- zhandle_t *zh; // ZooKeeper connection handle.
-
- Watcher *watcher; // Associated Watcher instance.
- PID watcherProcess; // PID of WatcherProcess that invokes Watcher.
-
- static void watch(zhandle_t *zh, int type, int state,
- const char *path, void *context)
- {
- ZooKeeperProcess *zooKeeperProcess =
- static_cast<ZooKeeperProcess*>(context);
- Event *event = new Event();
- event->zk = zooKeeperProcess->zk;
- event->type = type;
- event->state = state;
- event->path = path;
- zooKeeperProcess->send(zooKeeperProcess->watcherProcess,
- EVENT,
- reinterpret_cast<char *>(&event),
- sizeof(Event *));
- }
-
- static void createCompletion(int ret, const char *value, const void *data)
+public:
+ ZooKeeperProcess(ZooKeeper *_zk, const string &_hosts, int _timeout,
+ Watcher *_watcher)
+ : zk(_zk), hosts(_hosts), timeout(_timeout), watcher(_watcher),
+ watcherProcess(NULL)
{
- CreateCall *createCall =
- static_cast<CreateCall *>(const_cast<void *>((data)));
- createCall->ret = ret;
- if (createCall->result != NULL && value != NULL)
- createCall->result->assign(value);
- createCall->zooKeeperProcess->send(createCall->from, COMPLETED);
- }
+ if (watcher == NULL) {
+ fatalerror("cannot instantiate ZooKeeper with NULL watcher");
+ }
- static void removeCompletion(int ret, const void *data)
- {
- RemoveCall *removeCall =
- static_cast<RemoveCall *>(const_cast<void *>((data)));
- removeCall->ret = ret;
- removeCall->zooKeeperProcess->send(removeCall->from, COMPLETED);
+ zh = zookeeper_init(hosts.c_str(), watch, timeout, NULL, this, 0);
+ if (zh == NULL) {
+ fatalerror("failed to create ZooKeeper (zookeeper_init)");
+ }
}
- static void existsCompletion(int ret, const Stat *stat, const void *data)
+ ~ZooKeeperProcess()
{
- ExistsCall *existsCall =
- static_cast<ExistsCall *>(const_cast<void *>((data)));
- existsCall->ret = ret;
- if (existsCall->stat != NULL && stat != NULL)
- *(existsCall->stat) = *(stat);
- existsCall->zooKeeperProcess->send(existsCall->from, COMPLETED);
+ int ret = zookeeper_close(zh);
+ if (ret != ZOK) {
+ fatal("failed to destroy ZooKeeper (zookeeper_close): %s", zerror(ret));
+ }
+
+ if (watcherProcess != NULL) {
+ Process::dispatch(manager, &WatcherProcessManager::destroy, watcher);
+ }
}
- static void getCompletion(int ret, const char *value, int value_len,
- const Stat *stat, const void *data)
+public:
+ Result<int> create(const string &path, const string &data,
+ const ACL_vector &acl, int flags, string *result)
{
- GetCall *getCall =
- static_cast<GetCall *>(const_cast<void *>((data)));
- getCall->ret = ret;
- if (getCall->result != NULL && value != NULL && value_len > 0)
- getCall->result->assign(value, value_len);
- if (getCall->stat != NULL && stat != NULL)
- *(getCall->stat) = *(stat);
- getCall->zooKeeperProcess->send(getCall->from, COMPLETED);
+ Promise<int> promise;
+
+ tuple<Promise<int>, string *> *args =
+ new tuple<Promise<int>, string *>(promise, result);
+
+ int ret = zoo_acreate(zh, path.c_str(), data.data(), data.size(), &acl,
+ flags, stringCompletion, args);
+
+ if (ret != ZOK) {
+ promise.set(ret);
+ delete args;
+ }
+
+ return promise;
}
- static void getChildrenCompletion(int ret, const String_vector *results,
- const void *data)
+ Result<int> remove(const string &path, int version)
{
- GetChildrenCall *getChildrenCall =
- static_cast<GetChildrenCall *>(const_cast<void *>((data)));
- getChildrenCall->ret = ret;
- if (getChildrenCall->results != NULL && results != NULL) {
- for (int i = 0; i < results->count; i++) {
- getChildrenCall->results->push_back(results->data[i]);
- }
+ Promise<int> promise;
+
+ tuple<Promise<int> > *args = new tuple<Promise<int> >(promise);
+
+ int ret = zoo_adelete(zh, path.c_str(), version, voidCompletion, args);
+
+ if (ret != ZOK) {
+ promise.set(ret);
+ delete args;
}
- getChildrenCall->zooKeeperProcess->send(getChildrenCall->from, COMPLETED);
+
+ return promise;
}
- static void setCompletion(int ret, const Stat *stat, const void *data)
+ Result<int> exists(const string &path, bool watch, Stat *stat)
{
- SetCall *setCall =
- static_cast<SetCall *>(const_cast<void *>((data)));
- setCall->ret = ret;
- setCall->zooKeeperProcess->send(setCall->from, COMPLETED);
+ Promise<int> promise;
+
+ tuple<Promise<int>, Stat *> *args =
+ new tuple<Promise<int>, Stat *>(promise, stat);
+
+ int ret = zoo_aexists(zh, path.c_str(), watch, statCompletion, args);
+
+ if (ret != ZOK) {
+ promise.set(ret);
+ delete args;
+ }
+
+ return promise;
}
- bool prepare(int *fd, int *ops, timeval *tv)
+ Result<int> get(const string &path, bool watch, string *result, Stat *stat)
{
- int interest = 0;
+ Promise<int> promise;
- int ret = zookeeper_interest(zh, fd, &interest, tv);
+ tuple<Promise<int>, string *, Stat *> *args =
+ new tuple<Promise<int>, string *, Stat *>(promise, result, stat);
- // If in some disconnected state, try again later.
- if (ret == ZINVALIDSTATE ||
- ret == ZCONNECTIONLOSS ||
- ret == ZOPERATIONTIMEOUT)
- return false;
+ int ret = zoo_aget(zh, path.c_str(), watch, dataCompletion, args);
- if (ret != ZOK)
- fatal("zookeeper_interest failed! (%s)", zerror(ret));
+ if (ret != ZOK) {
+ promise.set(ret);
+ delete args;
+ }
- *ops = 0;
+ return promise;
+ }
- if ((interest & ZOOKEEPER_READ) && (interest & ZOOKEEPER_WRITE)) {
- *ops |= RDWR;
- } else if (interest & ZOOKEEPER_READ) {
- *ops |= RDONLY;
- } else if (interest & ZOOKEEPER_WRITE) {
- *ops |= WRONLY;
+ Result<int> getChildren(const string &path, bool watch,
+ vector<string> *results)
+ {
+ Promise<int> promise;
+
+ tuple<Promise<int>, vector<string> *> *args =
+ new tuple<Promise<int>, vector<string> *>(promise, results);
+
+ int ret = zoo_aget_children(zh, path.c_str(), watch, stringsCompletion,
+ args);
+
+ if (ret != ZOK) {
+ promise.set(ret);
+ delete args;
}
- return true;
+ return promise;
}
- void process(int fd, int ops)
+ Result<int> set(const string &path, const string &data, int version)
{
- int events = 0;
+ Promise<int> promise;
- if (ready(fd, RDONLY)) {
- events |= ZOOKEEPER_READ;
- } if (ready(fd, WRONLY)) {
- events |= ZOOKEEPER_WRITE;
- }
+ tuple<Promise<int>, Stat *> *args =
+ new tuple<Promise<int>, Stat *>(promise, NULL);
- int ret = zookeeper_process(zh, events);
+ int ret = zoo_aset(zh, path.c_str(), data.data(), data.size(),
+ version, statCompletion, args);
- // If in some disconnected state, try again later.
- if (ret == ZINVALIDSTATE ||
- ret == ZCONNECTIONLOSS)
- return;
+ if (ret != ZOK) {
+ promise.set(ret);
+ delete args;
+ }
- if (ret != ZOK && ret != ZNOTHING)
- fatal("zookeeper_process failed! (%s)", zerror(ret));
+ return promise;
}
protected:
- void operator () ()
+ virtual void operator () ()
{
- // Lookup and cache the WatcherProcess PID associated with our
- // Watcher _before_ we yield control via calling zookeeper_process
- // so that Watcher callbacks can occur.
- send(manager, LOOKUP_PID,
- reinterpret_cast<char *>(&watcher), sizeof(watcher));
- if (receive() != OK)
- fatal("failed to setup underlying ZooKeeper mechanisms");
+ // Lookup and cache the WatcherProcess associated with our Watcher
+ // before we yield control via calling zookeeper_process so that
+ // Watcher callbacks can occur.
+ watcherProcess = call(manager, &WatcherProcessManager::lookup, watcher);
+
+ // A NULL WatcherProcess means that our Watcher must not have been
+ // constructed correctly. Note that a Watcher might have already
+ // been cleaned up (and therefore the WatcherProcess terminated),
+ // but we are responsible for actually calling destroy and thus
+ // cleaning up the WatcherProcess, so it shouldn't be NULL here.
+ if (watcherProcess == NULL)
+ fatal("cannot use Watcher (has it been constructed?)");
// TODO(benh): Link with WatcherProcess?
- watcherProcess =
- *reinterpret_cast<PID *>(const_cast<char *>(body(NULL)));
while (true) {
int fd;
@@ -466,405 +322,262 @@ protected:
// Cause await to return immediately if the file descriptor is
// not valid (for example because the connection timed out) and
// secs is 0 because that will block indefinitely.
- if (fd == -1 && secs == 0)
+ if (fd == -1 && secs == 0) {
secs = -1;
+ }
if (await(fd, ops, secs, false)) {
// Either timer expired (might be 0) or data became available on fd.
process(fd, ops);
} else {
- // TODO(benh): Don't handle incoming "calls" until we are connected!
- switch (receive()) {
- case CREATE: {
- CreateCall *createCall =
- *reinterpret_cast<CreateCall **>(const_cast<char *>(body(NULL)));
- createCall->from = from();
- createCall->zooKeeperProcess = this;
- int ret = zoo_acreate(zh, createCall->path->c_str(),
- createCall->data->data(),
- createCall->data->size(),
- createCall->acl, createCall->flags,
- createCompletion, createCall);
- if (ret != ZOK) {
- createCall->ret = ret;
- send(createCall->from, COMPLETED);
- }
- break;
- }
- case REMOVE: {
- RemoveCall *removeCall =
- *reinterpret_cast<RemoveCall **>(const_cast<char *>(body(NULL)));
- removeCall->from = from();
- removeCall->zooKeeperProcess = this;
- int ret = zoo_adelete(zh, removeCall->path->c_str(),
- removeCall->version,
- removeCompletion, removeCall);
- if (ret != ZOK) {
- removeCall->ret = ret;
- send(removeCall->from, COMPLETED);
- }
- break;
- }
- case EXISTS: {
- ExistsCall *existsCall =
- *reinterpret_cast<ExistsCall **>(const_cast<char *>(body(NULL)));
- existsCall->from = from();
- existsCall->zooKeeperProcess = this;
- int ret = zoo_aexists(zh, existsCall->path->c_str(),
- existsCall->watch,
- existsCompletion, existsCall);
- if (ret != ZOK) {
- existsCall->ret = ret;
- send(existsCall->from, COMPLETED);
- }
- break;
- }
- case GET: {
- GetCall *getCall =
- *reinterpret_cast<GetCall **>(const_cast<char *>(body(NULL)));
- getCall->from = from();
- getCall->zooKeeperProcess = this;
- int ret = zoo_aget(zh, getCall->path->c_str(), getCall->watch,
- getCompletion, getCall);
- if (ret != ZOK) {
- getCall->ret = ret;
- send(getCall->from, COMPLETED);
- }
- break;
- }
- case GET_CHILDREN: {
- GetChildrenCall *getChildrenCall =
- *reinterpret_cast<GetChildrenCall **>(const_cast<char *>(body(NULL)));
- getChildrenCall->from = from();
- getChildrenCall->zooKeeperProcess = this;
- int ret = zoo_aget_children(zh, getChildrenCall->path->c_str(),
- getChildrenCall->watch,
- getChildrenCompletion,
- getChildrenCall);
- if (ret != ZOK) {
- getChildrenCall->ret = ret;
- send(getChildrenCall->from, COMPLETED);
- }
- break;
- }
- case SET: {
- SetCall *setCall =
- *reinterpret_cast<SetCall **>(const_cast<char *>(body(NULL)));
- setCall->from = from();
- setCall->zooKeeperProcess = this;
- int ret = zoo_aset(zh, setCall->path->c_str(),
- setCall->data->data(),
- setCall->data->size(),
- setCall->version,
- setCompletion, setCall);
- if (ret != ZOK) {
- setCall->ret = ret;
- send(setCall->from, COMPLETED);
- }
- break;
- }
- case TERMINATE: {
- return;
- }
- default: {
- fatal("unexpected interruption during await");
- }
- }
+ // Okay, a message must have been received. Handle only one
+ // message at a time so as not to delay any necessary internal
+ // processing.
+ serve(0, false);
+ if (name() == TERMINATE) {
+ return;
+ } else {
+ fatal("unexpected interruption during await");
+ }
}
}
}
-public:
- ZooKeeperProcess(ZooKeeper *_zk,
- const string &_hosts,
- int _timeout,
- Watcher *_watcher)
- : zk(_zk), hosts(_hosts), timeout(_timeout), watcher(_watcher)
+private:
+ static void watch(zhandle_t *zh, int type, int state,
+ const char *path, void *ctx)
{
- zh = zookeeper_init(hosts.c_str(), watch, timeout, NULL, this, 0);
- if (zh == NULL)
- fatalerror("failed to create ZooKeeper (zookeeper_init)");
+ ZooKeeperProcess *zooKeeperProcess = static_cast<ZooKeeperProcess*>(ctx);
+ Process::dispatch(zooKeeperProcess->watcherProcess, &WatcherProcess::event,
+ zooKeeperProcess->zk, type, state, path);
}
- ~ZooKeeperProcess()
+ static void voidCompletion(int ret, const void *data)
{
- int ret = zookeeper_close(zh);
- if (ret != ZOK)
- fatal("failed to destroy ZooKeeper (zookeeper_close): %s", zerror(ret));
- }
-};
+ const tuple<Promise<int> > *args =
+ reinterpret_cast<const tuple<Promise<int> > *>(data);
+ Promise<int> promise = (*args).get<0>();
+ promise.set(ret);
-ZooKeeper::ZooKeeper(const string &hosts, int timeout, Watcher *watcher)
-{
- impl = new ZooKeeperProcess(this, hosts, timeout, watcher);
- Process::spawn(impl);
-}
+ delete args;
+ }
-ZooKeeper::~ZooKeeper()
-{
- Process::post(impl->self(), TERMINATE);
- Process::wait(impl->self());
- delete impl;
-}
+ static void stringCompletion(int ret, const char *value, const void *data)
+ {
+ const tuple<Promise<int>, string *> *args =
+ reinterpret_cast<const tuple<Promise<int>, string *> *>(data);
+ Promise<int> promise = (*args).get<0>();
+ string *result = (*args).get<1>();
-int ZooKeeper::getState()
-{
- ZooKeeperProcess *zooKeeperProcess = static_cast<ZooKeeperProcess *>(impl);
- return zoo_state(zooKeeperProcess->zh);
-}
+ if (result != NULL && value != NULL) {
+ result->assign(value);
+ }
+ promise.set(ret);
-int ZooKeeper::create(const string &path,
- const string &data,
- const ACL_vector &acl,
- int flags,
- string *result)
-{
- CreateCall createCall;
- createCall.path = &path;
- createCall.data = &data;
- createCall.acl = &acl;
- createCall.flags = flags;
- createCall.result = result;
-
- class CreateCallProcess : public Process
- {
- private:
- ZooKeeperProcess *zooKeeperProcess;
- CreateCall *createCall;
-
- protected:
- void operator () ()
- {
- send(zooKeeperProcess->self(),
- CREATE,
- reinterpret_cast<char *>(&createCall),
- sizeof(CreateCall *));
- if (receive() != COMPLETED)
- createCall->ret = ZSYSTEMERROR;
- }
-
- public:
- CreateCallProcess(ZooKeeperProcess *_zooKeeperProcess,
- CreateCall *_createCall)
- : zooKeeperProcess(_zooKeeperProcess),
- createCall(_createCall)
- {}
- } createCallProcess(static_cast<ZooKeeperProcess *>(impl),
- &createCall);
+ delete args;
+ }
- Process::wait(Process::spawn(&createCallProcess));
- return createCall.ret;
-}
+ static void statCompletion(int ret, const Stat *stat, const void *data)
+ {
+ const tuple<Promise<int>, Stat *> *args =
+ reinterpret_cast<const tuple<Promise<int>, Stat *> *>(data);
+ Promise<int> promise = (*args).get<0>();
+ Stat *stat_result = (*args).get<1>();
+ if (stat_result != NULL && stat != NULL) {
+ *stat_result = *stat;
+ }
-int ZooKeeper::remove(const string &path, int version)
-{
- RemoveCall removeCall;
- removeCall.path = &path;
- removeCall.version = version;
+ promise.set(ret);
+
+ delete args;
+ }
- class RemoveCallProcess : public Process
+ static void dataCompletion(int ret, const char *value, int value_len,
+ const Stat *stat, const void *data)
{
- private:
- ZooKeeperProcess *zooKeeperProcess;
- RemoveCall *removeCall;
+ const tuple<Promise<int>, string *, Stat *> *args =
+ reinterpret_cast<const tuple<Promise<int>, string *, Stat *> *>(data);
+
+ Promise<int> promise = (*args).get<0>();
+ string *result = (*args).get<1>();
+ Stat *stat_result = (*args).get<2>();
- protected:
- void operator () ()
- {
- send(zooKeeperProcess->self(),
- REMOVE,
- reinterpret_cast<char *>(&removeCall),
- sizeof(RemoveCall *));
- if (receive() != COMPLETED)
- removeCall->ret = ZSYSTEMERROR;
+ if (result != NULL && value != NULL && value_len > 0) {
+ result->assign(value, value_len);
}
- public:
- RemoveCallProcess(ZooKeeperProcess *_zooKeeperProcess,
- RemoveCall *_removeCall)
- : zooKeeperProcess(_zooKeeperProcess),
- removeCall(_removeCall)
- {}
- } removeCallProcess(static_cast<ZooKeeperProcess *>(impl),
- &removeCall);
+ if (stat_result != NULL && stat != NULL) {
+ *stat_result = *stat;
+ }
- Process::wait(Process::spawn(&removeCallProcess));
+ promise.set(ret);
- return removeCall.ret;
-}
+ delete args;
+ }
+ static void stringsCompletion(int ret, const String_vector *values,
+ const void *data)
+ {
+ const tuple<Promise<int>, vector<string> *> *args =
+ reinterpret_cast<const tuple<Promise<int>, vector<string> *> *>(data);
-int ZooKeeper::exists(const string &path,
- bool watch,
- Stat *stat)
-{
- ExistsCall existsCall;
- existsCall.path = &path;
- existsCall.watch = watch;
- existsCall.stat = stat;
+ Promise<int> promise = (*args).get<0>();
+ vector<string> *results = (*args).get<1>();
+
+ if (results != NULL && values != NULL) {
+ for (int i = 0; i < values->count; i++) {
+ results->push_back(values->data[i]);
+ }
+ }
+
+ promise.set(ret);
- class ExistsCallProcess : public Process
+ delete args;
+ }
+
+ bool prepare(int *fd, int *ops, timeval *tv)
{
- private:
- ZooKeeperProcess *zooKeeperProcess;
- ExistsCall *existsCall;
+ int interest = 0;
- protected:
- void operator () ()
- {
- send(zooKeeperProcess->self(),
- EXISTS,
- reinterpret_cast<char *>(&existsCall),
- sizeof(ExistsCall *));
- if (receive() != COMPLETED)
- existsCall->ret = ZSYSTEMERROR;
- }
+ int ret = zookeeper_interest(zh, fd, &interest, tv);
- public:
- ExistsCallProcess(ZooKeeperProcess *_zooKeeperProcess,
- ExistsCall *_existsCall)
- : zooKeeperProcess(_zooKeeperProcess),
- existsCall(_existsCall)
- {}
- } existsCallProcess(static_cast<ZooKeeperProcess *>(impl),
- &existsCall);
+ // If in some disconnected state, try again later.
+ if (ret == ZINVALIDSTATE ||
+ ret == ZCONNECTIONLOSS ||
+ ret == ZOPERATIONTIMEOUT) {
+ return false;
+ }
- Process::wait(Process::spawn(&existsCallProcess));
+ if (ret != ZOK) {
+ fatal("zookeeper_interest failed! (%s)", zerror(ret));
+ }
- return existsCall.ret;
-}
+ *ops = 0;
+ if ((interest & ZOOKEEPER_READ) && (interest & ZOOKEEPER_WRITE)) {
+ *ops |= RDWR;
+ } else if (interest & ZOOKEEPER_READ) {
+ *ops |= RDONLY;
+ } else if (interest & ZOOKEEPER_WRITE) {
+ *ops |= WRONLY;
+ }
-int ZooKeeper::get(const string &path,
- bool watch,
- string *result,
- Stat *stat)
-{
- GetCall getCall;
- getCall.path = &path;
- getCall.watch = watch;
- getCall.result = result;
- getCall.stat = stat;
+ return true;
+ }
- class GetCallProcess : public Process
+ void process(int fd, int ops)
{
- private:
- ZooKeeperProcess *zooKeeperProcess;
- GetCall *getCall;
+ int events = 0;
+
+ if (ready(fd, RDONLY)) {
+ events |= ZOOKEEPER_READ;
+ } if (ready(fd, WRONLY)) {
+ events |= ZOOKEEPER_WRITE;
+ }
+
+ int ret = zookeeper_process(zh, events);
+
+ // If in some disconnected state, try again later.
+ if (ret == ZINVALIDSTATE || ret == ZCONNECTIONLOSS) {
+ return;
+ }
- protected:
- void operator () ()
- {
- send(zooKeeperProcess->self(),
- GET,
- reinterpret_cast<char *>(&getCall),
- sizeof(GetCall *));
- if (receive() != COMPLETED)
- getCall->ret = ZSYSTEMERROR;
+ if (ret != ZOK && ret != ZNOTHING) {
+ fatal("zookeeper_process failed! (%s)", zerror(ret));
}
+ }
+
+private:
+ friend class ZooKeeper;
+
+ ZooKeeper *zk; // ZooKeeper instance.
+ string hosts; // ZooKeeper host:port pairs.
+ int timeout; // ZooKeeper session timeout.
+ zhandle_t *zh; // ZooKeeper connection handle.
+
+ Watcher *watcher; // Associated Watcher instance.
+ WatcherProcess *watcherProcess; // PID of WatcherProcess that invokes Watcher.
+};
- public:
- GetCallProcess(ZooKeeperProcess *_zooKeeperProcess,
- GetCall *_getCall)
- : zooKeeperProcess(_zooKeeperProcess),
- getCall(_getCall)
- {}
- } getCallProcess(static_cast<ZooKeeperProcess *>(impl),
- &getCall);
- Process::wait(Process::spawn(&getCallProcess));
- return getCall.ret;
+ZooKeeper::ZooKeeper(const string &hosts, int timeout, Watcher *watcher)
+{
+ impl = new ZooKeeperProcess(this, hosts, timeout, watcher);
+ Process::spawn(impl);
}
-int ZooKeeper::getChildren(const string &path,
- bool watch,
- vector<string> *results)
+ZooKeeper::~ZooKeeper()
{
- GetChildrenCall getChildrenCall;
- getChildrenCall.path = &path;
- getChildrenCall.watch = watch;
- getChildrenCall.results = results;
+ Process::post(impl->self(), TERMINATE);
+ Process::wait(impl->self());
+ delete impl;
+}
- class GetChildrenCallProcess : public Process
- {
- private:
- ZooKeeperProcess *zooKeeperProcess;
- GetChildrenCall *getChildrenCall;
- protected:
- void operator () ()
- {
- send(zooKeeperProcess->self(),
- GET_CHILDREN,
- reinterpret_cast<char *>(&getChildrenCall),
- sizeof(GetChildrenCall *));
- if (receive() != COMPLETED)
- getChildrenCall->ret = ZSYSTEMERROR;
- }
+int ZooKeeper::getState()
+{
+ ZooKeeperProcess *zooKeeperProcess = static_cast<ZooKeeperProcess *>(impl);
+ return zoo_state(zooKeeperProcess->zh);
+}
- public:
- GetChildrenCallProcess(ZooKeeperProcess *_zooKeeperProcess,
- GetChildrenCall *_getChidlrenCall)
- : zooKeeperProcess(_zooKeeperProcess),
- getChildrenCall(_getChidlrenCall)
- {}
- } getChildrenCallProcess(static_cast<ZooKeeperProcess *>(impl),
- &getChildrenCall);
- Process::wait(Process::spawn(&getChildrenCallProcess));
+int ZooKeeper::create(const string &path, const string &data,
+ const ACL_vector &acl, int flags, string *result)
+{
+ ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
+ return Process::call(process, &ZooKeeperProcess::create,
+ cref(path), cref(data), cref(acl), flags, result);
+}
+
- return getChildrenCall.ret;
+int ZooKeeper::remove(const string &path, int version)
+{
+ ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
+ return Process::call(process, &ZooKeeperProcess::remove,
+ cref(path), version);
}
-int ZooKeeper::set(const string &path,
- const string &data,
- int version)
+int ZooKeeper::exists(const string &path, bool watch, Stat *stat)
{
- SetCall setCall;
- setCall.path = &path;
- setCall.data = &data;
- setCall.version = version;
+ ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
+ return Process::call(process, &ZooKeeperProcess::exists,
+ cref(path), watch, stat);
+}
- class SetCallProcess : public Process
- {
- private:
- ZooKeeperProcess *zooKeeperProcess;
- SetCall *setCall;
- protected:
- void operator () ()
- {
- send(zooKeeperProcess->self(),
- SET,
- reinterpret_cast<char *>(&setCall),
- sizeof(SetCall *));
- if (receive() != COMPLETED)
- setCall->ret = ZSYSTEMERROR;
- }
+int ZooKeeper::get(const string &path, bool watch, string *result, Stat *stat)
+{
+ ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
+ return Process::call(process, &ZooKeeperProcess::get,
+ cref(path), watch, result, stat);
+}
- public:
- SetCallProcess(ZooKeeperProcess *_zooKeeperProcess,
- SetCall *_setCall)
- : zooKeeperProcess(_zooKeeperProcess),
- setCall(_setCall)
- {}
- } setCallProcess(static_cast<ZooKeeperProcess *>(impl),
- &setCall);
- Process::wait(Process::spawn(&setCallProcess));
+int ZooKeeper::getChildren(const string &path, bool watch,
+ vector<string> *results)
+{
+ ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
+ return Process::call(process, &ZooKeeperProcess::getChildren,
+ cref(path), watch, results);
+}
- return setCall.ret;
+
+int ZooKeeper::set(const string &path, const string &data, int version)
+{
+ ZooKeeperProcess* process = static_cast<ZooKeeperProcess*>(impl);
+ return Process::call(process, &ZooKeeperProcess::set,
+ cref(path), cref(data), version);
}
@@ -879,7 +592,7 @@ const char * ZooKeeper::error(int ret) c
// public:
// void process(ZooKeeper *zk, int type, int state, const string &path)
// {
-// std::cout << "TestWatcher::process" << std::endl;
+// cout << "TestWatcher::process" << endl;
// }
// };
Modified: incubator/mesos/trunk/src/exec/exec.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/exec/exec.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/exec/exec.cpp (original)
+++ incubator/mesos/trunk/src/exec/exec.cpp Sun Jun 5 09:08:02 2011
@@ -1,6 +1,7 @@
#include <signal.h>
-#include <cerrno>
+#include <glog/logging.h>
+
#include <iostream>
#include <string>
#include <sstream>
@@ -25,8 +26,6 @@ using boost::bind;
using boost::cref;
using boost::unordered_map;
-using std::cerr;
-using std::endl;
using std::string;
@@ -47,10 +46,12 @@ public:
protected:
virtual void operator () ()
{
+ VLOG(1) << "Executor started at: " << self();
+
link(slave);
// Register with slave.
- Message<E2S_REGISTER_EXECUTOR> out;
+ MSG<E2S_REGISTER_EXECUTOR> out;
out.mutable_framework_id()->MergeFrom(frameworkId);
out.mutable_executor_id()->MergeFrom(executorId);
send(slave, out);
@@ -69,18 +70,24 @@ protected:
switch(receive(2)) {
case S2E_REGISTER_REPLY: {
- const Message<S2E_REGISTER_REPLY>& msg = message();
+ const MSG<S2E_REGISTER_REPLY>& msg = message();
+
slaveId = msg.args().slave_id();
+
+ VLOG(1) << "Executor registered on slave " << slaveId;
+
invoke(bind(&Executor::init, executor, driver, cref(msg.args())));
break;
}
case S2E_RUN_TASK: {
- const Message<S2E_RUN_TASK>& msg = message();
+ const MSG<S2E_RUN_TASK>& msg = message();
const TaskDescription& task = msg.task();
- Message<E2S_STATUS_UPDATE> out;
+ VLOG(1) << "Executor asked to run a task " << task.task_id();
+
+ MSG<E2S_STATUS_UPDATE> out;
out.mutable_framework_id()->MergeFrom(frameworkId);
TaskStatus* status = out.mutable_status();
status->mutable_task_id()->MergeFrom(task.task_id());
@@ -93,14 +100,20 @@ protected:
}
case S2E_KILL_TASK: {
- const Message<S2E_KILL_TASK>& msg = message();
+ const MSG<S2E_KILL_TASK>& msg = message();
+
+ VLOG(1) << "Executor asked to kill task " << msg.task_id();
+
invoke(bind(&Executor::killTask, executor, driver,
cref(msg.task_id())));
break;
}
case S2E_FRAMEWORK_MESSAGE: {
- const Message<S2E_FRAMEWORK_MESSAGE>& msg = message();
+ const MSG<S2E_FRAMEWORK_MESSAGE>& msg = message();
+
+ VLOG(1) << "Executor passed message";
+
const FrameworkMessage& message = msg.message();
invoke(bind(&Executor::frameworkMessage, executor, driver,
cref(message)));
@@ -108,6 +121,7 @@ protected:
}
case S2E_KILL_EXECUTOR: {
+ VLOG(1) << "Executor asked to shutdown";
invoke(bind(&Executor::shutdown, executor, driver));
if (!local)
exit(0);
@@ -116,6 +130,8 @@ protected:
}
case PROCESS_EXIT: {
+ VLOG(1) << "Slave exited, trying to shutdown";
+
// TODO: Pass an argument to shutdown to tell it this is abnormal?
invoke(bind(&Executor::shutdown, executor, driver));
@@ -135,9 +151,9 @@ protected:
}
default: {
+ VLOG(1) << "Received unknown message ID " << msgid()
+ << " from " << from();
// TODO: Is this serious enough to exit?
- cerr << "Received unknown message ID " << msgid()
- << " from " << from() << endl;
break;
}
}
@@ -307,7 +323,7 @@ int MesosExecutorDriver::sendStatusUpdat
}
// TODO(benh): Do a dispatch to Executor first?
- Message<E2S_STATUS_UPDATE> out;
+ MSG<E2S_STATUS_UPDATE> out;
out.mutable_framework_id()->MergeFrom(process->frameworkId);
out.mutable_status()->MergeFrom(status);
process->send(process->slave, out);
@@ -335,7 +351,7 @@ int MesosExecutorDriver::sendFrameworkMe
}
// TODO(benh): Do a dispatch to Executor first?
- Message<E2S_FRAMEWORK_MESSAGE> out;
+ MSG<E2S_FRAMEWORK_MESSAGE> out;
out.mutable_framework_id()->MergeFrom(process->frameworkId);
out.mutable_message()->MergeFrom(message);
process->send(process->slave, out);
Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Sun Jun 5 09:08:02 2011
@@ -81,7 +81,7 @@ protected:
receive();
CHECK(msgid() == M2M_GET_STATE_REPLY);
- const Message<M2M_GET_STATE_REPLY>& msg = message();
+ const MSG<M2M_GET_STATE_REPLY>& msg = message();
state::MasterState *state =
*(state::MasterState **) msg.pointer().data();
@@ -120,14 +120,16 @@ private:
Master::Master()
- : nextFrameworkId(0), nextSlaveId(0), nextOfferId(0)
+ : MesosProcess("master"),
+ nextFrameworkId(0), nextSlaveId(0), nextOfferId(0)
{
allocatorType = "simple";
}
Master::Master(const Configuration& _conf)
- : conf(_conf), nextFrameworkId(0), nextSlaveId(0), nextOfferId(0)
+ : MesosProcess("master"),
+ conf(_conf), nextFrameworkId(0), nextSlaveId(0), nextOfferId(0)
{
allocatorType = conf.get("allocator", "simple");
}
@@ -316,7 +318,7 @@ void Master::operator () ()
<< "we haven't received an identifier yet!";
}
- const Message<GOT_MASTER_TOKEN>& msg = message();
+ const MSG<GOT_MASTER_TOKEN>& msg = message();
// The master ID is comprised of the current date and some ephemeral
// token (e.g., determined by ZooKeeper).
@@ -337,7 +339,7 @@ void Master::operator () ()
switch (receive()) {
case NEW_MASTER_DETECTED: {
- const Message<NEW_MASTER_DETECTED>& msg = message();
+ const MSG<NEW_MASTER_DETECTED>& msg = message();
// Check and see if we are (1) still waiting to be the active
// master, (2) newly active master, (3) no longer active master,
@@ -372,7 +374,7 @@ void Master::operator () ()
}
case F2M_REGISTER_FRAMEWORK: {
- const Message<F2M_REGISTER_FRAMEWORK>& msg = message();
+ const MSG<F2M_REGISTER_FRAMEWORK>& msg = message();
Framework *framework =
new Framework(msg.framework(), newFrameworkId(), from(), elapsed());
@@ -381,7 +383,7 @@ void Master::operator () ()
if (framework->info.executor().uri() == "") {
LOG(INFO) << framework << " registering without an executor URI";
- Message<M2F_ERROR> out;
+ MSG<M2F_ERROR> out;
out.set_code(1);
out.set_message("No executor URI given");
send(from(), out);
@@ -393,7 +395,7 @@ void Master::operator () ()
if (framework->info.user() == "root" && rootSubmissions == false) {
LOG(INFO) << framework << " registering as root, but "
<< "root submissions are disabled on this cluster";
- Message<M2F_ERROR> out;
+ MSG<M2F_ERROR> out;
out.set_code(1);
out.set_message("User 'root' is not allowed to run frameworks");
send(from(), out);
@@ -406,11 +408,11 @@ void Master::operator () ()
}
case F2M_REREGISTER_FRAMEWORK: {
- const Message<F2M_REREGISTER_FRAMEWORK> &msg = message();
+ const MSG<F2M_REREGISTER_FRAMEWORK> &msg = message();
if (msg.framework_id() == "") {
LOG(ERROR) << "Framework re-registering without an id!";
- Message<M2F_ERROR> out;
+ MSG<M2F_ERROR> out;
out.set_code(1);
out.set_message("Missing framework id");
send(from(), out);
@@ -420,7 +422,7 @@ void Master::operator () ()
if (msg.framework().executor().uri() == "") {
LOG(INFO) << "Framework " << msg.framework_id() << " re-registering "
<< "without an executor URI";
- Message<M2F_ERROR> out;
+ MSG<M2F_ERROR> out;
out.set_code(1);
out.set_message("No executor URI given");
send(from(), out);
@@ -449,7 +451,7 @@ void Master::operator () ()
LOG(INFO) << "Framework " << msg.framework_id()
<< " re-registering with an already used id "
<< " and not failing over!";
- Message<M2F_ERROR> out;
+ MSG<M2F_ERROR> out;
out.set_code(1);
out.set_message("Framework id in use");
send(from(), out);
@@ -483,7 +485,7 @@ void Master::operator () ()
// it currently isn't running any tasks. This could be a
// potential scalability issue ...
foreachpair (_, Slave *slave, slaves) {
- Message<M2S_UPDATE_FRAMEWORK> out;
+ MSG<M2S_UPDATE_FRAMEWORK> out;
out.mutable_framework_id()->MergeFrom(msg.framework_id());
out.set_pid(from());
send(slave->pid, out);
@@ -492,7 +494,7 @@ void Master::operator () ()
}
case F2M_UNREGISTER_FRAMEWORK: {
- const Message<F2M_UNREGISTER_FRAMEWORK>& msg = message();
+ const MSG<F2M_UNREGISTER_FRAMEWORK>& msg = message();
LOG(INFO) << "Asked to unregister framework " << msg.framework_id();
@@ -508,7 +510,7 @@ void Master::operator () ()
}
case F2M_RESOURCE_OFFER_REPLY: {
- const Message<F2M_RESOURCE_OFFER_REPLY>& msg = message();
+ const MSG<F2M_RESOURCE_OFFER_REPLY>& msg = message();
Framework *framework = lookupFramework(msg.framework_id());
if (framework != NULL) {
@@ -528,7 +530,7 @@ void Master::operator () ()
// immediately report any tasks in it as lost (it would
// probably be better to have better error messages here).
foreach (const TaskDescription &task, tasks) {
- Message<M2F_STATUS_UPDATE> out;
+ MSG<M2F_STATUS_UPDATE> out;
out.mutable_framework_id()->MergeFrom(msg.framework_id());
TaskStatus *status = out.mutable_status();
status->mutable_task_id()->MergeFrom(task.task_id());
@@ -542,7 +544,7 @@ void Master::operator () ()
}
case F2M_REVIVE_OFFERS: {
- const Message<F2M_REVIVE_OFFERS>& msg = message();
+ const MSG<F2M_REVIVE_OFFERS>& msg = message();
Framework *framework = lookupFramework(msg.framework_id());
if (framework != NULL) {
@@ -554,7 +556,7 @@ void Master::operator () ()
}
case F2M_KILL_TASK: {
- const Message<F2M_KILL_TASK>& msg = message();
+ const MSG<F2M_KILL_TASK>& msg = message();
LOG(INFO) << "Asked to kill task " << msg.task_id()
<< " of framework " << msg.framework_id();
@@ -568,7 +570,7 @@ void Master::operator () ()
LOG(ERROR) << "Cannot kill task " << msg.task_id()
<< " of framework " << msg.framework_id()
<< " because it cannot be found";
- Message<M2F_STATUS_UPDATE> out;
+ MSG<M2F_STATUS_UPDATE> out;
out.mutable_framework_id()->MergeFrom(task->framework_id());
TaskStatus *status = out.mutable_status();
status->mutable_task_id()->MergeFrom(task->task_id());
@@ -581,13 +583,13 @@ void Master::operator () ()
}
case F2M_FRAMEWORK_MESSAGE: {
- const Message<F2M_FRAMEWORK_MESSAGE>& msg = message();
+ const MSG<F2M_FRAMEWORK_MESSAGE>& msg = message();
Framework *framework = lookupFramework(msg.framework_id());
if (framework != NULL) {
Slave *slave = lookupSlave(msg.message().slave_id());
if (slave != NULL) {
- Message<M2S_FRAMEWORK_MESSAGE> out;
+ MSG<M2S_FRAMEWORK_MESSAGE> out;
out.mutable_framework_id()->MergeFrom(msg.framework_id());
out.mutable_message()->MergeFrom(msg.message());
send(slave->pid, out);
@@ -597,13 +599,13 @@ void Master::operator () ()
}
case F2M_STATUS_UPDATE_ACK: {
- const Message<F2M_STATUS_UPDATE_ACK>& msg = message();
+ const MSG<F2M_STATUS_UPDATE_ACK>& msg = message();
Framework *framework = lookupFramework(msg.framework_id());
if (framework != NULL) {
Slave *slave = lookupSlave(msg.slave_id());
if (slave != NULL) {
- Message<M2S_STATUS_UPDATE_ACK> out;
+ MSG<M2S_STATUS_UPDATE_ACK> out;
out.mutable_framework_id()->MergeFrom(msg.framework_id());
out.mutable_slave_id()->MergeFrom(msg.slave_id());
out.mutable_task_id()->MergeFrom(msg.task_id());
@@ -614,7 +616,7 @@ void Master::operator () ()
}
case S2M_REGISTER_SLAVE: {
- const Message<S2M_REGISTER_SLAVE>& msg = message();
+ const MSG<S2M_REGISTER_SLAVE>& msg = message();
Slave* slave = new Slave(msg.slave(), newSlaveId(), from(), elapsed());
@@ -627,7 +629,7 @@ void Master::operator () ()
allocator->slaveAdded(slave);
- Message<M2S_REGISTER_REPLY> out;
+ MSG<M2S_REGISTER_REPLY> out;
out.mutable_slave_id()->MergeFrom(slave->slaveId);
out.set_heartbeat_interval(HEARTBEAT_INTERVAL);
send(slave->pid, out);
@@ -635,7 +637,7 @@ void Master::operator () ()
}
case S2M_REREGISTER_SLAVE: {
- const Message<S2M_REREGISTER_SLAVE>& msg = message();
+ const MSG<S2M_REREGISTER_SLAVE>& msg = message();
LOG(INFO) << "Re-registering " << msg.slave_id() << " at " << from();
@@ -662,7 +664,7 @@ void Master::operator () ()
pidToSlaveId[slave->pid] = slave->slaveId;
link(slave->pid);
- Message<M2S_REREGISTER_REPLY> out;
+ MSG<M2S_REREGISTER_REPLY> out;
out.mutable_slave_id()->MergeFrom(slave->slaveId);
out.set_heartbeat_interval(HEARTBEAT_INTERVAL);
send(slave->pid, out);
@@ -675,7 +677,7 @@ void Master::operator () ()
Framework *framework = lookupFramework(task->framework_id());
if (framework != NULL) {
framework->addTask(task);
- Message<M2S_UPDATE_FRAMEWORK> out;
+ MSG<M2S_UPDATE_FRAMEWORK> out;
out.mutable_framework_id()->MergeFrom(framework->frameworkId);
out.set_pid(framework->pid);
send(slave->pid, out);
@@ -689,7 +691,7 @@ void Master::operator () ()
}
case S2M_UNREGISTER_SLAVE: {
- const Message<S2M_UNREGISTER_SLAVE>& msg = message();
+ const MSG<S2M_UNREGISTER_SLAVE>& msg = message();
LOG(INFO) << "Asked to unregister slave " << msg.slave_id();
@@ -702,7 +704,7 @@ void Master::operator () ()
}
case S2M_STATUS_UPDATE: {
- const Message<S2M_STATUS_UPDATE>& msg = message();
+ const MSG<S2M_STATUS_UPDATE>& msg = message();
const TaskStatus& status = msg.status();
@@ -716,7 +718,7 @@ void Master::operator () ()
Framework* framework = lookupFramework(msg.framework_id());
if (framework != NULL) {
// Pass on the (transformed) status update to the framework.
- Message<M2F_STATUS_UPDATE> out;
+ MSG<M2F_STATUS_UPDATE> out;
out.mutable_framework_id()->MergeFrom(msg.framework_id());
out.mutable_status()->MergeFrom(status);
send(framework->pid, out);
@@ -748,13 +750,13 @@ void Master::operator () ()
}
case S2M_FRAMEWORK_MESSAGE: {
- const Message<S2M_FRAMEWORK_MESSAGE>& msg = message();
+ const MSG<S2M_FRAMEWORK_MESSAGE>& msg = message();
Slave *slave = lookupSlave(msg.message().slave_id());
if (slave != NULL) {
Framework *framework = lookupFramework(msg.framework_id());
if (framework != NULL) {
- Message<M2S_FRAMEWORK_MESSAGE> out;
+ MSG<M2S_FRAMEWORK_MESSAGE> out;
out.mutable_framework_id()->MergeFrom(msg.framework_id());
out.mutable_message()->MergeFrom(msg.message());
send(framework->pid, out);
@@ -764,7 +766,7 @@ void Master::operator () ()
}
case S2M_EXITED_EXECUTOR: {
- const Message<S2M_EXITED_EXECUTOR>&msg = message();
+ const MSG<S2M_EXITED_EXECUTOR>&msg = message();
Slave *slave = lookupSlave(msg.slave_id());
if (slave != NULL) {
@@ -780,7 +782,7 @@ void Master::operator () ()
foreachpaircopy (_, Task* task, framework->tasks) {
if (task->slave_id() == slave->slaveId &&
task->executor_id() == msg.executor_id()) {
- Message<M2F_STATUS_UPDATE> out;
+ MSG<M2F_STATUS_UPDATE> out;
out.mutable_framework_id()->MergeFrom(task->framework_id());
TaskStatus *status = out.mutable_status();
status->mutable_task_id()->MergeFrom(task->task_id());
@@ -803,7 +805,7 @@ void Master::operator () ()
}
case SH2M_HEARTBEAT: {
- const Message<SH2M_HEARTBEAT>& msg = message();
+ const MSG<SH2M_HEARTBEAT>& msg = message();
Slave *slave = lookupSlave(msg.slave_id());
if (slave != NULL) {
@@ -834,7 +836,7 @@ void Master::operator () ()
}
case M2M_FRAMEWORK_EXPIRED: {
- const Message<M2M_FRAMEWORK_EXPIRED>&msg = message();
+ const MSG<M2M_FRAMEWORK_EXPIRED>&msg = message();
Framework* framework = lookupFramework(msg.framework_id());
if (framework != NULL) {
@@ -889,7 +891,7 @@ void Master::operator () ()
case M2M_GET_STATE: {
state::MasterState *state = getState();
- Message<M2M_GET_STATE_REPLY> out;
+ MSG<M2M_GET_STATE_REPLY> out;
out.set_pointer((char *) &state, sizeof(state));
send(from(), out);
break;
@@ -902,6 +904,13 @@ void Master::operator () ()
return;
}
+ case vars: {
+ LOG(INFO) << "HTTP request for 'vars'";
+ const string& data = conf.str();
+ Process::send(from(), "response", data.data(), data.size());
+ break;
+ }
+
default:
LOG(ERROR) << "Received unknown message (" << msgid()
<< ") from " << from();
@@ -929,16 +938,16 @@ OfferID Master::makeOffer(Framework *fra
LOG(INFO) << "Sending " << offer << " to " << framework;
- Message<M2F_RESOURCE_OFFER> out;
+ MSG<M2F_RESOURCE_OFFER> out;
out.mutable_offer_id()->MergeFrom(offerId);
foreach (const SlaveResources& r, resources) {
- SlaveOffer* offer = out.add_offer();
+ SlaveOffer* offer = out.add_offers();
offer->mutable_slave_id()->MergeFrom(r.slave->slaveId);
offer->set_hostname(r.slave->info.hostname());
offer->mutable_resources()->MergeFrom(r.resources);
- out.add_pid(r.slave->pid);
+ out.add_pids(r.slave->pid);
}
send(framework->pid, out);
@@ -1091,7 +1100,7 @@ void Master::launchTask(Framework* frame
LOG(INFO) << "Launching " << t << " on " << slave;
- Message<M2S_RUN_TASK> out;
+ MSG<M2S_RUN_TASK> out;
out.mutable_framework()->MergeFrom(framework->info);
out.mutable_framework_id()->MergeFrom(framework->frameworkId);
out.set_pid(framework->pid);
@@ -1115,7 +1124,7 @@ void Master::killTask(Task *task)
Slave *slave = lookupSlave(task->slave_id());
CHECK(slave != NULL);
- Message<M2S_KILL_TASK> out;
+ MSG<M2S_KILL_TASK> out;
out.mutable_framework_id()->MergeFrom(framework->frameworkId);
out.mutable_task_id()->MergeFrom(task->task_id());
send(slave->pid, out);
@@ -1130,7 +1139,7 @@ void Master::terminateFramework(Framewor
{
LOG(INFO) << "Terminating " << framework << " due to error: " << message;
- Message<M2F_ERROR> out;
+ MSG<M2F_ERROR> out;
out.set_code(code);
out.set_message(message);
send(framework->pid, out);
@@ -1159,7 +1168,7 @@ void Master::removeSlotOffer(SlotOffer *
// Also send framework a rescind message unless the reason we are
// removing the offer is that the framework replied to it
if (reason != ORR_FRAMEWORK_REPLIED) {
- Message<M2F_RESCIND_OFFER> out;
+ MSG<M2F_RESCIND_OFFER> out;
out.mutable_offer_id()->MergeFrom(offer->offerId);
send(framework->pid, out);
}
@@ -1181,7 +1190,7 @@ void Master::addFramework(Framework *fra
pidToFrameworkId[framework->pid] = framework->frameworkId;
link(framework->pid);
- Message<M2F_REGISTER_REPLY> out;
+ MSG<M2F_REGISTER_REPLY> out;
out.mutable_framework_id()->MergeFrom(framework->frameworkId);
send(framework->pid, out);
@@ -1201,7 +1210,7 @@ void Master::failoverFramework(Framework
removeSlotOffer(offer, ORR_FRAMEWORK_FAILOVER, offer->resources);
}
- Message<M2F_ERROR> out;
+ MSG<M2F_ERROR> out;
out.set_code(1);
out.set_message("Framework failover");
send(oldPid, out);
@@ -1224,7 +1233,7 @@ void Master::failoverFramework(Framework
// Make sure we can get offers again.
framework->active = true;
- Message<M2F_REGISTER_REPLY> reply;
+ MSG<M2F_REGISTER_REPLY> reply;
reply.mutable_framework_id()->MergeFrom(framework->frameworkId);
send(newPid, reply);
}
@@ -1239,7 +1248,7 @@ void Master::removeFramework(Framework *
// Tell slaves to kill the framework
foreachpair (_, Slave *slave, slaves) {
- Message<M2S_KILL_FRAMEWORK> out;
+ MSG<M2S_KILL_FRAMEWORK> out;
out.mutable_framework_id()->MergeFrom(framework->frameworkId);
send(slave->pid, out);
}
@@ -1287,7 +1296,7 @@ void Master::removeSlave(Slave *slave)
// framework until it fails over. See the TODO above in
// S2M_REREGISTER_SLAVE.
if (framework != NULL) {
- Message<M2F_STATUS_UPDATE> out;
+ MSG<M2F_STATUS_UPDATE> out;
out.mutable_framework_id()->MergeFrom(task->framework_id());
TaskStatus *status = out.mutable_status();
status->mutable_task_id()->MergeFrom(task->task_id());
@@ -1317,7 +1326,7 @@ void Master::removeSlave(Slave *slave)
// Send lost-slave message to all frameworks (this helps them re-run
// previously finished tasks whose output was on the lost slave)
foreachpair (_, Framework *framework, frameworks) {
- Message<M2F_LOST_SLAVE> out;
+ MSG<M2F_LOST_SLAVE> out;
out.mutable_slave_id()->MergeFrom(slave->slaveId);
send(framework->pid, out);
}
Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Sun Jun 5 09:08:02 2011
@@ -12,7 +12,7 @@
#include <string>
#include <vector>
-#include <reliable.hpp>
+#include <process.hpp>
#include <glog/logging.h>
@@ -97,7 +97,7 @@ protected:
do {
switch (receive(FRAMEWORK_FAILOVER_TIMEOUT)) {
case PROCESS_TIMEOUT: {
- Message<M2M_FRAMEWORK_EXPIRED> msg;
+ MSG<M2M_FRAMEWORK_EXPIRED> msg;
msg.mutable_framework_id()->set_value(frameworkId.value());
send(master, msg);
return;
Modified: incubator/mesos/trunk/src/master/webui.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/webui.cpp?rev=1132253&r1=1132252&r2=1132253&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/webui.cpp (original)
+++ incubator/mesos/trunk/src/master/webui.cpp Sun Jun 5 09:08:02 2011
@@ -87,7 +87,7 @@ public:
receive();
CHECK(msgid() == M2M_GET_STATE_REPLY);
- const Message<M2M_GET_STATE_REPLY>& msg = message();
+ const MSG<M2M_GET_STATE_REPLY>& msg = message();
masterState =
*(state::MasterState **) msg.pointer().data();
Added: incubator/mesos/trunk/src/messaging/messages.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.cpp?rev=1132253&view=auto
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.cpp (added)
+++ incubator/mesos/trunk/src/messaging/messages.cpp Sun Jun 5 09:08:02 2011
@@ -0,0 +1,109 @@
+#include "messaging/messages.hpp"
+
+
+namespace mesos { namespace internal {
+
+boost::unordered_map<std::string, MSGID> MesosProcess::ids;
+boost::unordered_map<MSGID, std::string> MesosProcess::names;
+
+
+static struct Initialization
+{
+ Initialization()
+ {
+ MesosProcess::ids[EXIT] = PROCESS_EXIT;
+ MesosProcess::names[PROCESS_EXIT] = EXIT;
+
+ MesosProcess::ids[TIMEOUT] = PROCESS_TIMEOUT;
+ MesosProcess::names[PROCESS_TIMEOUT] = TIMEOUT;
+ }
+} __initialization__;
+
+
+struct InitializeMessage
+{
+ InitializeMessage(const std::string& name, MSGID id)
+ {
+ MesosProcess::ids[name] = id;
+ MesosProcess::names[id] = name;
+ }
+};
+
+
+#define INITIALIZE_MESSAGE(ID) \
+ static InitializeMessage __ ## ID(#ID, ID)
+
+
+INITIALIZE_MESSAGE(F2M_REGISTER_FRAMEWORK);
+INITIALIZE_MESSAGE(F2M_REREGISTER_FRAMEWORK);
+INITIALIZE_MESSAGE(F2M_UNREGISTER_FRAMEWORK);
+INITIALIZE_MESSAGE(F2M_RESOURCE_OFFER_REPLY);
+INITIALIZE_MESSAGE(F2M_REVIVE_OFFERS);
+INITIALIZE_MESSAGE(F2M_KILL_TASK);
+INITIALIZE_MESSAGE(F2M_FRAMEWORK_MESSAGE);
+INITIALIZE_MESSAGE(F2M_STATUS_UPDATE_ACK);
+
+INITIALIZE_MESSAGE(M2F_REGISTER_REPLY);
+INITIALIZE_MESSAGE(M2F_RESOURCE_OFFER);
+INITIALIZE_MESSAGE(M2F_RESCIND_OFFER);
+INITIALIZE_MESSAGE(M2F_STATUS_UPDATE);
+INITIALIZE_MESSAGE(M2F_LOST_SLAVE);
+INITIALIZE_MESSAGE(M2F_FRAMEWORK_MESSAGE);
+INITIALIZE_MESSAGE(M2F_ERROR);
+
+INITIALIZE_MESSAGE(S2M_REGISTER_SLAVE);
+INITIALIZE_MESSAGE(S2M_REREGISTER_SLAVE);
+INITIALIZE_MESSAGE(S2M_UNREGISTER_SLAVE);
+INITIALIZE_MESSAGE(S2M_STATUS_UPDATE);
+INITIALIZE_MESSAGE(S2M_FRAMEWORK_MESSAGE);
+INITIALIZE_MESSAGE(S2M_EXITED_EXECUTOR);
+
+INITIALIZE_MESSAGE(SH2M_HEARTBEAT);
+
+INITIALIZE_MESSAGE(M2S_REGISTER_REPLY);
+INITIALIZE_MESSAGE(M2S_REREGISTER_REPLY);
+INITIALIZE_MESSAGE(M2S_RUN_TASK);
+INITIALIZE_MESSAGE(M2S_KILL_TASK);
+INITIALIZE_MESSAGE(M2S_KILL_FRAMEWORK);
+INITIALIZE_MESSAGE(M2S_FRAMEWORK_MESSAGE);
+INITIALIZE_MESSAGE(M2S_UPDATE_FRAMEWORK);
+INITIALIZE_MESSAGE(M2S_STATUS_UPDATE_ACK);
+INITIALIZE_MESSAGE(M2S_SHUTDOWN);
+
+INITIALIZE_MESSAGE(E2S_REGISTER_EXECUTOR);
+INITIALIZE_MESSAGE(E2S_STATUS_UPDATE);
+INITIALIZE_MESSAGE(E2S_FRAMEWORK_MESSAGE);
+
+INITIALIZE_MESSAGE(S2E_REGISTER_REPLY);
+INITIALIZE_MESSAGE(S2E_RUN_TASK);
+INITIALIZE_MESSAGE(S2E_KILL_TASK);
+INITIALIZE_MESSAGE(S2E_FRAMEWORK_MESSAGE);
+INITIALIZE_MESSAGE(S2E_KILL_EXECUTOR);
+
+#ifdef __sun__
+INITIALIZE_MESSAGE(PD2S_REGISTER_PROJD);
+INITIALIZE_MESSAGE(PD2S_PROJD_READY);
+INITIALIZE_MESSAGE(S2PD_UPDATE_RESOURCES);
+INITIALIZE_MESSAGE(S2PD_KILL_ALL);
+#endif // __sun__
+
+INITIALIZE_MESSAGE(M2M_GET_STATE);
+INITIALIZE_MESSAGE(M2M_GET_STATE_REPLY);
+INITIALIZE_MESSAGE(M2M_TIMER_TICK);
+INITIALIZE_MESSAGE(M2M_FRAMEWORK_EXPIRED);
+INITIALIZE_MESSAGE(M2M_SHUTDOWN);
+
+INITIALIZE_MESSAGE(S2S_GET_STATE);
+INITIALIZE_MESSAGE(S2S_GET_STATE_REPLY);
+INITIALIZE_MESSAGE(S2S_SHUTDOWN);
+
+INITIALIZE_MESSAGE(GOT_MASTER_TOKEN);
+INITIALIZE_MESSAGE(NEW_MASTER_DETECTED);
+INITIALIZE_MESSAGE(NO_MASTER_DETECTED);
+INITIALIZE_MESSAGE(MASTER_DETECTION_FAILURE);
+
+INITIALIZE_MESSAGE(vars);
+
+INITIALIZE_MESSAGE(MESOS_MSGID);
+
+}} // namespace mesos { namespace internal {