You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:28:04 UTC
svn commit: r1131773 - in /incubator/mesos/trunk/src/third_party/libprocess:
Makefile.in reliable.cpp reliable.hpp
Author: benh
Date: Sun Jun 5 05:28:03 2011
New Revision: 1131773
URL: http://svn.apache.org/viewvc?rev=1131773&view=rev
Log:
Updates to ReliableProcess.
Modified:
incubator/mesos/trunk/src/third_party/libprocess/Makefile.in
incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp
incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp
Modified: incubator/mesos/trunk/src/third_party/libprocess/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/Makefile.in?rev=1131773&r1=1131772&r2=1131773&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/Makefile.in (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/Makefile.in Sun Jun 5 05:28:03 2011
@@ -21,10 +21,10 @@ CXXFLAGS += -fPIC
# Add dependency tracking to CXXFLAGS.
CXXFLAGS += -MMD -MP
-LIB_OBJ = process.o tuple.o record-process.o serialization.o fatal.o
+LIB_OBJ = process.o reliable.o tuple.o record-process.o serialization.o fatal.o
LIB = libprocess.a
-LITHE_LIB_OBJ = process_lithe.o tuple_lithe.o record-process_lithe.o serialization_lithe.o fatal_lithe.o
+LITHE_LIB_OBJ = process_lithe.o reliable_lithe.o tuple_lithe.o record-process_lithe.o serialization_lithe.o fatal_lithe.o
LITHE_LIB = libprocess_lithe.a
ifdef HAVE_LITHE
Modified: incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp?rev=1131773&r1=1131772&r2=1131773&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp Sun Jun 5 05:28:03 2011
@@ -1,31 +1,52 @@
+#include <assert.h>
+
+#include "fatal.hpp"
+#include "foreach.hpp"
#include "reliable.hpp"
+using std::map;
+
+#define malloc(bytes) \
+ ({ void *tmp; \
+ if ((tmp = malloc(bytes)) == NULL) \
+ fatalerror("malloc"); tmp; \
+ })
+
+#define realloc(address, bytes) \
+ ({ void *tmp; \
+ if ((tmp = realloc(address, bytes)) == NULL) \
+ fatalerror("realloc"); tmp; \
+ })
+
#define TIMEOUT 10
+/*
+ * TODO(benh): Don't send structs around, this is horribly
+ * scary. Instead, either use what ever hotness we get from Avro or
+ * ProtocolBuffers, or something of the sort.
+ */
+struct rmsg
+{
+ int seq;
+ struct msg msg;
+};
+
+
class ReliableSender : public Process
{
public:
- int seq;
- struct msg *msg;
- PID via;
+ struct rmsg *rmsg;
- ReliableSender(int _seq, struct msg *_msg, const PID& _via = PID())
- : seq(_seq), msg(_msg), via(_via) {}
+ ReliableSender(struct rmsg *_rmsg)
+ : rmsg(_rmsg) {}
protected:
void operator () ()
{
do {
- if (via == PID()) {
- fatal("unimplemented");
- //string data = pack<RELIABLE_MSG>(seq, msg);
- send(msg->to, RELIABLE_MSG, data.data(), data.size());
- } else {
- fatal("unimplemented");
- //string data = pack<RELIABLE_RELAY>(seq, msg);
- send(via, RELIABLE_RELAY, data.data(), data.size());
- }
+ send(rmsg->msg.to, RELIABLE_MSG, (char *) rmsg,
+ sizeof(struct rmsg) + rmsg->msg.len);
switch (receive(TIMEOUT)) {
case RELIABLE_ACK: {
@@ -33,8 +54,7 @@ protected:
return;
}
case RELIABLE_REDIRECT: {
- const PID &pid = *static_cast<const PID *>(body());
- msg->to = pid;
+ rmsg->msg.to = *reinterpret_cast<const PID *>(body(NULL));
break;
}
case PROCESS_TIMEOUT: {
@@ -47,66 +67,94 @@ protected:
};
-ReliableProcess::ReliableProcess() : seq(0) {}
+ReliableProcess::ReliableProcess() : current(NULL) {}
-ReliableProcess::~ReliableProcess() {}
+ReliableProcess::~ReliableProcess()
+{
+ if (current != NULL) {
+ free(current);
+ current = NULL;
+ }
+}
-void ReliableProcess::redirect(const PID &old, const PID &cur)
+int ReliableProcess::seq() const
{
- // Send a redirect to all running senders and update internal mapping.
- foreachpair (const PID &pid, ReliableSender *sender, senders) {
- // TODO(benh): Don't look into sender's class like this ... HACK!
- if (old == sender->msg->to) {
- assert(pid, sender->getPID());
- send(pid, RELIABLE_REDIRECT, &cur, sizeof(PID));
+ if (current != NULL)
+ return current->seq;
+
+ return -1;
+}
+
+
+bool ReliableProcess::duplicate() const
+{
+ // TODO(benh): Since we ignore out-of-order messages right now, a
+ // duplicate message is just one that we've already seen
+ // before. Note that we don't add the sequence identifier for the
+ // current message until the next 'receive' invocation (see below)..
+ if (current != NULL) {
+ map<PID, int>::const_iterator it = recvSeqs.find(current->msg.from);
+ if (it != recvSeqs.end()) {
+ int last = it->second;
+ if (last <= current->seq)
+ return true;
}
}
+
+ return false;
}
-void ReliableProcess::rsend(const PID &to, MSGID id, const char *data, size_t length)
+PID ReliableProcess::origin() const
{
- // Allocate/Initialize outgoing message.
- struct msg *msg = (struct msg *) malloc(sizeof(struct msg) + length);
+ if (current != NULL)
+ return current->msg.from;
+
+ return PID();
+}
- msg->from.pipe = self().pipe;
- msg->from.ip = self().ip;
- msg->from.port = self().port;
- msg->to.pipe = to.pipe;
- msg->to.ip = to.ip;
- msg->to.port = to.port;
- msg->id = id;
- msg->len = length;
- if (length > 0)
- memcpy((char *) msg + sizeof(struct msg), data, length);
+void ReliableProcess::ack()
+{
+ if (current != NULL)
+ send(current->msg.from, RELIABLE_ACK);
+}
- ReliableSender *sender = new ReliableSender(seq++, msg);
- PID pid = link(spawn(sender));
- senders[pid] = sender;
+
+bool ReliableProcess::forward(const PID &to)
+{
+ if (current != NULL) {
+ send(to, RELIABLE_MSG, (char *) current,
+ sizeof(struct rmsg) + current->msg.len);
+ return true;
+ }
+
+ return false;
}
-void ReliableProcess::relay(const PID &via, const PID &to, MSGID id, const char *data, size_t length)
+void ReliableProcess::rsend(const PID &to, MSGID id, const char *data, size_t length)
{
// Allocate/Initialize outgoing message.
- struct msg *msg = (struct msg *) malloc(sizeof(struct msg) + length);
+ struct rmsg *rmsg = (struct rmsg *) malloc(sizeof(struct rmsg) + length);
+
+ rmsg->seq = sentSeqs[to]++;
- msg->from.pipe = self().pipe;
- msg->from.ip = self().ip;
- msg->from.port = self().port;
- msg->to.pipe = to.pipe;
- msg->to.ip = to.ip;
- msg->to.port = to.port;
- msg->id = id;
- msg->len = length;
+ rmsg->msg.from.pipe = self().pipe;
+ rmsg->msg.from.ip = self().ip;
+ rmsg->msg.from.port = self().port;
+ rmsg->msg.to.pipe = to.pipe;
+ rmsg->msg.to.ip = to.ip;
+ rmsg->msg.to.port = to.port;
+ rmsg->msg.id = id;
+ rmsg->msg.len = length;
if (length > 0)
- memcpy((char *) msg + sizeof(struct msg), data, length);
+ memcpy((char *) rmsg + sizeof(struct rmsg), data, length);
- ReliableSender *sender = new ReliableSender(seq++, msg, via);
+ ReliableSender *sender = new ReliableSender(rmsg);
PID pid = link(spawn(sender));
senders[pid] = sender;
}
@@ -114,18 +162,41 @@ void ReliableProcess::relay(const PID &v
MSGID ReliableProcess::receive(time_t secs)
{
+ // Record sequence number for current (now old) _reliable_ message
+ // and also free the message.
+ if (current != NULL) {
+ // TODO(benh): Since we ignore out-of-order messages right now, we
+ // can be sure that the current message is the next in the
+ // sequence (unless it's the first message).
+ assert((recvSeqs.find(current->msg.from) == recvSeqs.end()) ||
+ (recvSeqs[current->msg.from] + 1 == current->seq));
+ recvSeqs[current->msg.from] = current->seq;
+ free(current);
+ current = NULL;
+ }
+
do {
MSGID id = Process::receive(secs);
switch (id) {
case RELIABLE_MSG: {
- send(from(), RELIABLE_ACK);
- const struct msg *msg = static_cast<const struct msg *>(body());
- inject(msg->from, msg->id, body() + sizeof(struct msg), msg->len);
- return Process::receive();
- }
- case RELIABLE_RELAY: {
- const struct msg *msg = static_cast<const struct msg *>(body());
- inject(msg->from, msg->id, body() + sizeof(struct msg), msg->len);
+ size_t length;
+ const char *data = body(&length);
+ assert(length > 0);
+ current = (struct rmsg *) malloc(length);
+ memcpy((char *) current, data, length);
+
+ // TODO(benh): Don't ignore out-of-order messages!
+ if (recvSeqs.find(current->msg.from) != recvSeqs.end())
+ if (recvSeqs[current->msg.from] + 1 < current->seq)
+ continue;
+
+ // Note that we don't record the sequence number here so that
+ // our logic in 'duplicate' (see above) is correct. We might
+ // want to consider a more complicated mechanism for
+ // determining duplicates.
+
+ inject(current->msg.from, current->msg.id,
+ data + sizeof(struct rmsg), current->msg.len);
return Process::receive();
}
case PROCESS_EXIT: {
@@ -141,3 +212,15 @@ MSGID ReliableProcess::receive(time_t se
return id;
} while (true);
}
+
+
+void ReliableProcess::redirect(const PID &existing, const PID &updated)
+{
+ // Send a redirect to all running senders and update internal mapping.
+ foreachpair (const PID &pid, ReliableSender *sender, senders) {
+ assert(pid == sender->getPID());
+ // TODO(benh): Don't look into sender's class like this ... HACK!
+ if (existing == sender->rmsg->msg.to)
+ send(pid, RELIABLE_REDIRECT, (char *) &updated, sizeof(PID));
+ }
+}
Modified: incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp?rev=1131773&r1=1131772&r2=1131773&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp Sun Jun 5 05:28:03 2011
@@ -3,83 +3,90 @@
#include <process.hpp>
+#include <map>
+
+enum {
+ RELIABLE_MSG = PROCESS_MSGID,
+ RELIABLE_ACK,
+ RELIABLE_REDIRECT,
+ RELIABLE_MSGID
+};
+
+
+struct rmsg;
class ReliableSender;
class ReliableProcess : public Process
{
public:
- Reliable();
- ~Reliable();
+ ReliableProcess();
+ ~ReliableProcess();
protected:
/**
- * Sequence number of current message.
+ * @return sequence number of current _message, or -1 if current
+ * message is not a _reliable_ message.
*/
- virtual bool seq() const;
+ virtual int seq() const;
/**
- * Whether or not current message has been seen before.
+ * @return true if current message has been seen before, otherwise
+ * false (because current message has not been seen before or is not
+ * a _reliable_ message).
*/
virtual bool duplicate() const;
/**
- * Update PID 'old' to 'cur' so that unsent messages will be
- * redirected appropriately.
- * @param old the existing PID
- * @param cur the new PID
+ * @return origin of current message, or equal to @see Process::from().
*/
- virtual void redirect(const PID &old, const PID &cur);
+ virtual PID origin() const;
/**
- * Sends a _reliable_ message to PID.
- * @param to destination
- * @param id message id
+ * Acknowledges the current message by sending an 'ack' back to the
+ * origin, or does nothing if the current message is not _reliable_.
*/
- virtual void rsend(const PID &to, MSGID id);
+ virtual void ack();
/**
- * Sends a _reliable_ message with data to PID.
+ * Forward current message (provided it is _reliable_).
* @param to destination
- * @param id message id
- * @param data payload
- * @param length payload length
+ * @return false if the current message is not _reliable_, true
+ * otherwise.
*/
- virtual void rsend(const PID &to, MSGID id, const char *data, size_t length);
+ virtual bool forward(const PID &to);
/**
- * Relay a _reliable_ message. The intermediate destination does not
- * send an ack.
- * @param via intermediate destination
+ * Sends a _reliable_ message to PID.
* @param to destination
* @param id message id
*/
- virtual void relay(const PID &via, const PID &to, MSGID id);
+ virtual void rsend(const PID &to, MSGID id);
/**
- * Relay a _reliable_ message with data. The intermediate
- * destination does not send an ack.
- * @param via intermediatedestination
+ * Sends a _reliable_ message with data to PID.
* @param to destination
* @param id message id
* @param data payload
* @param length payload length
*/
- virtual void relay(const PID &via, const PID &to, MSGID id, const char *data, size_t length);
-
- /**
- * Forward the current _reliable_ message to PID. The destination will
- * send an ack to the sender.
- * @param to destination
- */
- virtual void forward(const PID &to);
+ virtual void rsend(const PID &to, MSGID id, const char *data, size_t length);
/* Blocks for message at most specified seconds. */
virtual MSGID receive(time_t);
+
+ /**
+ * Redirect unacknolwedged messages to be sent to a different PID.
+ * @param existing the current PID
+ * @param updated the new PID
+ */
+ virtual void redirect(const PID &existing, const PID &updated);
private:
- int seq;
+ struct rmsg *current;
+ std::map<PID, int> sentSeqs;
+ std::map<PID, int> recvSeqs;
std::map<PID, ReliableSender *> senders;
};
@@ -90,10 +97,4 @@ inline void ReliableProcess::rsend(const
}
-inline void ReliableProcess::relay(const PID &via, const PID &to, MSGID id)
-{
- relay(via, to, id, NULL, 0);
-}
-
-
#endif /* __RELIABLE_HPP__ */