You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:35:57 UTC

svn commit: r1131809 - in /incubator/mesos/trunk/src/third_party/libprocess: reliable.cpp reliable.hpp tuple-impl.hpp

Author: benh
Date: Sun Jun  5 05:35:56 2011
New Revision: 1131809

URL: http://svn.apache.org/viewvc?rev=1131809&view=rev
Log:
Updates to reliable process to do the correct thing.

Modified:
    incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp
    incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp
    incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp

Modified: incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp?rev=1131809&r1=1131808&r2=1131809&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp Sun Jun  5 05:35:56 2011
@@ -41,6 +41,14 @@ public:
   ReliableSender(struct rmsg *_rmsg)
     : rmsg(_rmsg) {}
 
+  ~ReliableSender()
+  {
+    if (rmsg != NULL) {
+      free(rmsg);
+      rmsg = NULL;
+    }
+  }
+
 protected:
   void operator () ()
   {
@@ -128,7 +136,8 @@ PID ReliableProcess::origin() const
 void ReliableProcess::ack()
 {
   if (current != NULL)
-    send(current->msg.from, RELIABLE_ACK);
+    send(current->msg.from, RELIABLE_ACK, (char *) current,
+	 sizeof(struct rmsg) + current->msg.len);
 }
 
 
@@ -176,10 +185,12 @@ MSGID ReliableProcess::receive(double se
   if (current != NULL) {
     // TODO(benh): Since we ignore out-of-order messages right now, we
     // can be sure that the current message is the next in the
-    // sequence (unless it's the first message).
-    assert((recvSeqs.find(current->msg.from) == recvSeqs.end()) ||
-	   (recvSeqs[current->msg.from] + 1 == current->seq));
-    recvSeqs[current->msg.from] = current->seq;
+    // sequence (unless it's the first message or a duplicate).
+    if (!duplicate()) {
+      assert((recvSeqs.find(current->msg.from) == recvSeqs.end()) ||
+	     (recvSeqs[current->msg.from] + 1 == current->seq));
+      recvSeqs[current->msg.from] = current->seq;
+    }
     free(current);
     current = NULL;
   }
@@ -187,6 +198,24 @@ MSGID ReliableProcess::receive(double se
   do {
     MSGID id = Process::receive(secs);
     switch (id) {
+      // TODO(benh): Better validation of messages!
+      case RELIABLE_ACK: {
+	size_t length;
+	const char *data = body(&length);
+	assert(length > 0);
+	struct rmsg *rmsg = (struct rmsg *) data;
+
+	// TODO(benh): Is this really the way we want to do acks?
+	foreachpair (const PID &pid, ReliableSender *sender, senders) {
+	  assert(pid == sender->getPID());
+	  // TODO(benh): Don't look into sender's class like this ... HACK!
+	  if (rmsg->seq == sender->rmsg->seq &&
+	      rmsg->msg.to == sender->rmsg->msg.to) {
+	    send(pid, RELIABLE_ACK);
+	  }
+	}
+	continue;
+      }
       case RELIABLE_MSG: {
 	size_t length;
 	const char *data = body(&length);
@@ -206,7 +235,9 @@ MSGID ReliableProcess::receive(double se
 
 	inject(current->msg.from, current->msg.id,
 	       data + sizeof(struct rmsg), current->msg.len);
-	return Process::receive();
+
+	// Avoid recursively invoking ourselves via receive(), use receive(0)!
+	return Process::receive(0);
       }
       case PROCESS_EXIT: {
 	if (senders.find(from()) != senders.end()) {

Modified: incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp?rev=1131809&r1=1131808&r2=1131809&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp Sun Jun  5 05:35:56 2011
@@ -73,6 +73,9 @@ protected:
    */
   virtual void rsend(const PID &to, MSGID id, const char *data, size_t length);
 
+  /* Blocks for message indefinitely. */
+  virtual MSGID receive();
+
   /* Blocks for message at most specified seconds. */
   virtual MSGID receive(double secs);
 
@@ -97,4 +100,9 @@ inline void ReliableProcess::rsend(const
 }
 
 
+inline MSGID ReliableProcess::receive()
+{
+  return receive(0);
+}
+
 #endif /* __RELIABLE_HPP__ */

Modified: incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp?rev=1131809&r1=1131808&r2=1131809&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp Sun Jun  5 05:35:56 2011
@@ -580,7 +580,7 @@ protected:
 
   MSGID receive()
   {
-    return receive(0);
+    return P::receive();
   }
 
   MSGID receive(double secs)