You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:28:04 UTC

svn commit: r1131773 - in /incubator/mesos/trunk/src/third_party/libprocess: Makefile.in reliable.cpp reliable.hpp

Author: benh
Date: Sun Jun  5 05:28:03 2011
New Revision: 1131773

URL: http://svn.apache.org/viewvc?rev=1131773&view=rev
Log:
Updates to ReliableProcess.

Modified:
    incubator/mesos/trunk/src/third_party/libprocess/Makefile.in
    incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp
    incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp

Modified: incubator/mesos/trunk/src/third_party/libprocess/Makefile.in
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/Makefile.in?rev=1131773&r1=1131772&r2=1131773&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/Makefile.in (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/Makefile.in Sun Jun  5 05:28:03 2011
@@ -21,10 +21,10 @@ CXXFLAGS += -fPIC
 # Add dependency tracking to CXXFLAGS.
 CXXFLAGS += -MMD -MP
 
-LIB_OBJ = process.o tuple.o record-process.o serialization.o fatal.o
+LIB_OBJ = process.o reliable.o tuple.o record-process.o serialization.o fatal.o
 LIB = libprocess.a
 
-LITHE_LIB_OBJ = process_lithe.o tuple_lithe.o record-process_lithe.o serialization_lithe.o fatal_lithe.o
+LITHE_LIB_OBJ = process_lithe.o reliable_lithe.o tuple_lithe.o record-process_lithe.o serialization_lithe.o fatal_lithe.o
 LITHE_LIB = libprocess_lithe.a
 
 ifdef HAVE_LITHE

Modified: incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp?rev=1131773&r1=1131772&r2=1131773&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp Sun Jun  5 05:28:03 2011
@@ -1,31 +1,52 @@
+#include <assert.h>
+
+#include "fatal.hpp"
+#include "foreach.hpp"
 #include "reliable.hpp"
 
+using std::map;
+
+#define malloc(bytes)                                               \
+  ({ void *tmp;                                                     \
+     if ((tmp = malloc(bytes)) == NULL)                             \
+       fatalerror("malloc"); tmp;                                   \
+   })
+
+#define realloc(address, bytes)                                     \
+  ({ void *tmp;                                                     \
+     if ((tmp = realloc(address, bytes)) == NULL)                   \
+       fatalerror("realloc"); tmp;                                  \
+   })
+
 #define TIMEOUT 10
 
 
+/*
+ * TODO(benh): Don't send structs around, this is horribly
+ * scary. Instead, either use what ever hotness we get from Avro or
+ * ProtocolBuffers, or something of the sort.
+ */
+struct rmsg
+{
+  int seq;
+  struct msg msg;
+};
+
+
 class ReliableSender : public Process
 {
 public:
-  int seq;
-  struct msg *msg;
-  PID via;
+  struct rmsg *rmsg;
 
-  ReliableSender(int _seq, struct msg *_msg, const PID& _via = PID())
-    : seq(_seq), msg(_msg), via(_via) {}
+  ReliableSender(struct rmsg *_rmsg)
+    : rmsg(_rmsg) {}
 
 protected:
   void operator () ()
   {
     do {
-      if (via == PID()) {
-	fatal("unimplemented");
-	//string data = pack<RELIABLE_MSG>(seq, msg);
-	send(msg->to, RELIABLE_MSG, data.data(), data.size());
-      } else {
-	fatal("unimplemented");
-	//string data = pack<RELIABLE_RELAY>(seq, msg);
-	send(via, RELIABLE_RELAY, data.data(), data.size());
-      }
+      send(rmsg->msg.to, RELIABLE_MSG, (char *) rmsg,
+	   sizeof(struct rmsg) + rmsg->msg.len);
 
       switch (receive(TIMEOUT)) {
 	case RELIABLE_ACK: {
@@ -33,8 +54,7 @@ protected:
 	  return;
 	}
         case RELIABLE_REDIRECT: {
-	  const PID &pid = *static_cast<const PID *>(body());
-	  msg->to = pid;
+	  rmsg->msg.to = *reinterpret_cast<const PID *>(body(NULL));
 	  break;
 	}
         case PROCESS_TIMEOUT: {
@@ -47,66 +67,94 @@ protected:
 };
 
 
-ReliableProcess::ReliableProcess() : seq(0) {}
+ReliableProcess::ReliableProcess() : current(NULL) {}
 
 
-ReliableProcess::~ReliableProcess() {}
+ReliableProcess::~ReliableProcess()
+{
+  if (current != NULL) {
+    free(current);
+    current = NULL;
+  }
+}
 
 
-void ReliableProcess::redirect(const PID &old, const PID &cur)
+int ReliableProcess::seq() const
 {
-  // Send a redirect to all running senders and update internal mapping.
-  foreachpair (const PID &pid, ReliableSender *sender, senders) {
-    // TODO(benh): Don't look into sender's class like this ... HACK!
-    if (old == sender->msg->to) {
-      assert(pid, sender->getPID());
-      send(pid, RELIABLE_REDIRECT, &cur, sizeof(PID));
+  if (current != NULL)
+    return current->seq;
+
+  return -1;
+}
+
+
+bool ReliableProcess::duplicate() const
+{
+  // TODO(benh): Since we ignore out-of-order messages right now, a
+  // duplicate message is just one that we've already seen
+  // before. Note that we don't add the sequence identifier for the
+  // current message until the next 'receive' invocation (see below)..
+  if (current != NULL) {
+    map<PID, int>::const_iterator it = recvSeqs.find(current->msg.from);
+    if (it != recvSeqs.end()) {
+      int last = it->second;
+      if (last <= current->seq)
+	return true;
     }
   }
+
+  return false;
 }
 
 
-void ReliableProcess::rsend(const PID &to, MSGID id, const char *data, size_t length)
+PID ReliableProcess::origin() const
 {
-  // Allocate/Initialize outgoing message.
-  struct msg *msg = (struct msg *) malloc(sizeof(struct msg) + length);
+  if (current != NULL)
+    return current->msg.from;
+
+  return PID();
+}
 
-  msg->from.pipe = self().pipe;
-  msg->from.ip = self().ip;
-  msg->from.port = self().port;
-  msg->to.pipe = to.pipe;
-  msg->to.ip = to.ip;
-  msg->to.port = to.port;
-  msg->id = id;
-  msg->len = length;
 
-  if (length > 0)
-    memcpy((char *) msg + sizeof(struct msg), data, length);
+void ReliableProcess::ack()
+{
+  if (current != NULL)
+    send(current->msg.from, RELIABLE_ACK);
+}
 
-  ReliableSender *sender = new ReliableSender(seq++, msg);
-  PID pid = link(spawn(sender));
-  senders[pid] = sender;
+
+bool ReliableProcess::forward(const PID &to)
+{
+  if (current != NULL) {
+    send(to, RELIABLE_MSG, (char *) current,
+	 sizeof(struct rmsg) + current->msg.len);
+    return true;
+  }
+
+  return false;
 }
 
 
-void ReliableProcess::relay(const PID &via, const PID &to, MSGID id, const char *data, size_t length)
+void ReliableProcess::rsend(const PID &to, MSGID id, const char *data, size_t length)
 {
   // Allocate/Initialize outgoing message.
-  struct msg *msg = (struct msg *) malloc(sizeof(struct msg) + length);
+  struct rmsg *rmsg = (struct rmsg *) malloc(sizeof(struct rmsg) + length);
+
+  rmsg->seq = sentSeqs[to]++;
 
-  msg->from.pipe = self().pipe;
-  msg->from.ip = self().ip;
-  msg->from.port = self().port;
-  msg->to.pipe = to.pipe;
-  msg->to.ip = to.ip;
-  msg->to.port = to.port;
-  msg->id = id;
-  msg->len = length;
+  rmsg->msg.from.pipe = self().pipe;
+  rmsg->msg.from.ip = self().ip;
+  rmsg->msg.from.port = self().port;
+  rmsg->msg.to.pipe = to.pipe;
+  rmsg->msg.to.ip = to.ip;
+  rmsg->msg.to.port = to.port;
+  rmsg->msg.id = id;
+  rmsg->msg.len = length;
 
   if (length > 0)
-    memcpy((char *) msg + sizeof(struct msg), data, length);
+    memcpy((char *) rmsg + sizeof(struct rmsg), data, length);
 
-  ReliableSender *sender = new ReliableSender(seq++, msg, via);
+  ReliableSender *sender = new ReliableSender(rmsg);
   PID pid = link(spawn(sender));
   senders[pid] = sender;
 }
@@ -114,18 +162,41 @@ void ReliableProcess::relay(const PID &v
 
 MSGID ReliableProcess::receive(time_t secs)
 {
+  // Record sequence number for current (now old) _reliable_ message
+  // and also free the message.
+  if (current != NULL) {
+    // TODO(benh): Since we ignore out-of-order messages right now, we
+    // can be sure that the current message is the next in the
+    // sequence (unless it's the first message).
+    assert((recvSeqs.find(current->msg.from) == recvSeqs.end()) ||
+	   (recvSeqs[current->msg.from] + 1 == current->seq));
+    recvSeqs[current->msg.from] = current->seq;
+    free(current);
+    current = NULL;
+  }
+
   do {
     MSGID id = Process::receive(secs);
     switch (id) {
       case RELIABLE_MSG: {
-	send(from(), RELIABLE_ACK);
-	const struct msg *msg = static_cast<const struct msg *>(body());
-	inject(msg->from, msg->id, body() + sizeof(struct msg), msg->len);
-	return Process::receive();
-      }
-      case RELIABLE_RELAY: {
-	const struct msg *msg = static_cast<const struct msg *>(body());
-	inject(msg->from, msg->id, body() + sizeof(struct msg), msg->len);
+	size_t length;
+	const char *data = body(&length);
+	assert(length > 0);
+	current = (struct rmsg *) malloc(length);
+	memcpy((char *) current, data, length);
+
+	// TODO(benh): Don't ignore out-of-order messages!
+	if (recvSeqs.find(current->msg.from) != recvSeqs.end())
+	  if (recvSeqs[current->msg.from] + 1 < current->seq)
+	    continue;
+
+	// Note that we don't record the sequence number here so that
+	// our logic in 'duplicate' (see above) is correct. We might
+	// want to consider a more complicated mechanism for
+	// determining duplicates.
+
+	inject(current->msg.from, current->msg.id,
+	       data + sizeof(struct rmsg), current->msg.len);
 	return Process::receive();
       }
       case PROCESS_EXIT: {
@@ -141,3 +212,15 @@ MSGID ReliableProcess::receive(time_t se
     return id;
   } while (true);
 }
+
+
+void ReliableProcess::redirect(const PID &existing, const PID &updated)
+{
+  // Send a redirect to all running senders and update internal mapping.
+  foreachpair (const PID &pid, ReliableSender *sender, senders) {
+    assert(pid == sender->getPID());
+    // TODO(benh): Don't look into sender's class like this ... HACK!
+    if (existing == sender->rmsg->msg.to)
+      send(pid, RELIABLE_REDIRECT, (char *) &updated, sizeof(PID));
+  }
+}

Modified: incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp?rev=1131773&r1=1131772&r2=1131773&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp Sun Jun  5 05:28:03 2011
@@ -3,83 +3,90 @@
 
 #include <process.hpp>
 
+#include <map>
 
+
+enum {
+  RELIABLE_MSG = PROCESS_MSGID,
+  RELIABLE_ACK,
+  RELIABLE_REDIRECT,
+  RELIABLE_MSGID
+};
+
+
+struct rmsg; 
 class ReliableSender;
 
 
 class ReliableProcess : public Process
 {
 public:
-  Reliable();
-  ~Reliable();
+  ReliableProcess();
+  ~ReliableProcess();
 
 protected:
   /**
-   * Sequence number of current message.
+   * @return sequence number of current _message, or -1 if current
+   * message is not a _reliable_ message.
    */
-  virtual bool seq() const;
+  virtual int seq() const;
 
   /**
-   * Whether or not current message has been seen before.
+   * @return true if current message has been seen before, otherwise
+   * false (because current message has not been seen before or is not
+   * a _reliable_ message).
    */
   virtual bool duplicate() const;
 
   /**
-   * Update PID 'old' to 'cur' so that unsent messages will be
-   * redirected appropriately.
-   * @param old the existing PID
-   * @param cur the new PID
+   * @return origin of current message, or equal to @see Process::from().
    */
-  virtual void redirect(const PID &old, const PID &cur);
+  virtual PID origin() const;
 
   /**
-   * Sends a _reliable_ message to PID.
-   * @param to destination
-   * @param id message id
+   * Acknowledges the current message by sending an 'ack' back to the
+   * origin, or does nothing if the current message is not _reliable_.
    */
-  virtual void rsend(const PID &to, MSGID id);
+  virtual void ack();
 
   /**
-   * Sends a _reliable_ message with data to PID.
+   * Forward current message (provided it is _reliable_).
    * @param to destination
-   * @param id message id
-   * @param data payload
-   * @param length payload length
+   * @return false if the current message is not _reliable_, true
+   * otherwise.
    */
-  virtual void rsend(const PID &to, MSGID id, const char *data, size_t length);
+  virtual bool forward(const PID &to);
 
   /**
-   * Relay a _reliable_ message. The intermediate destination does not
-   * send an ack.
-   * @param via intermediate destination
+   * Sends a _reliable_ message to PID.
    * @param to destination
    * @param id message id
    */
-  virtual void relay(const PID &via, const PID &to, MSGID id);
+  virtual void rsend(const PID &to, MSGID id);
 
   /**
-   * Relay a _reliable_ message with data. The intermediate
-   * destination does not send an ack.
-   * @param via intermediatedestination
+   * Sends a _reliable_ message with data to PID.
    * @param to destination
    * @param id message id
    * @param data payload
    * @param length payload length
    */
-  virtual void relay(const PID &via, const PID &to, MSGID id, const char *data, size_t length);
-
-  /**
-   * Forward the current _reliable_ message to PID. The destination will
-   * send an ack to the sender.
-   * @param to destination
-   */
-  virtual void forward(const PID &to);
+  virtual void rsend(const PID &to, MSGID id, const char *data, size_t length);
 
   /* Blocks for message at most specified seconds. */
   virtual MSGID receive(time_t);
+
+  /**
+   * Redirect unacknolwedged messages to be sent to a different PID.
+   * @param existing the current PID
+   * @param updated the new PID
+   */
+  virtual void redirect(const PID &existing, const PID &updated);
   
 private:
-  int seq;
+  struct rmsg *current;
+  std::map<PID, int> sentSeqs;
+  std::map<PID, int> recvSeqs;
   std::map<PID, ReliableSender *> senders;
 };
 
@@ -90,10 +97,4 @@ inline void ReliableProcess::rsend(const
 }
 
 
-inline void ReliableProcess::relay(const PID &via, const PID &to, MSGID id)
-{
-  relay(via, to, id, NULL, 0);
-}
-
-
 #endif /* __RELIABLE_HPP__ */