You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:26:07 UTC

svn commit: r1131584 - in /incubator/mesos/trunk/src: ft_messaging.cpp ft_messaging.hpp

Author: benh
Date: Sun Jun  5 03:26:06 2011
New Revision: 1131584

URL: http://svn.apache.org/viewvc?rev=1131584&view=rev
Log:
Moved definitions to cpp file and added doxygen comments to the hpp file

Modified:
    incubator/mesos/trunk/src/ft_messaging.cpp
    incubator/mesos/trunk/src/ft_messaging.hpp

Modified: incubator/mesos/trunk/src/ft_messaging.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.cpp?rev=1131584&r1=1131583&r2=1131584&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.cpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.cpp Sun Jun  5 03:26:06 2011
@@ -63,6 +63,63 @@ FTMessaging::FTMessaging() : 
 string FTMessaging::getNextId() {
   return uniqPrefix + ":" + lexical_cast<string>(msgId++);
 }                                                      
+  
+void FTMessaging::gotAck(string ftId) {
+  DLOG(INFO) << "FT: Got ack, deleting outstanding msg " << ftId;
+  outMsgs.erase(ftId);
+}
+
+void FTMessaging::sendOutstanding() {
+  if (!master) {
+    DLOG(INFO) << "FT: Not RE-resending due to NULL master PID";
+    return;
+  } 
+
+  foreachpair( const string &ftId, struct StoredMsg &msg, outMsgs) {
+    if (msg.count < FT_MAX_RESENDS) {
+      DLOG(INFO) << "FT: RE-sending " << msg.ftId << " attempt:" << msg.count;
+      Process::post(master, msg.id, msg.data.data(), msg.data.size());
+      msg.count++;
+    } else {
+      DLOG(INFO) << "FT: Not RE-sending " << msg.ftId << " reached limit " << FT_MAX_RESENDS;
+      outMsgs.erase(ftId);
+    }
+  }
+}
+
+// Careful: not idempotent function.
+bool FTMessaging::acceptMessage(string from, string ftId) {
+  if (inMsgs.find(from)==inMsgs.end()) {
+    DLOG(INFO) << "FT: new msgs seq: " << ftId;
+    inMsgs[from] = ftId;
+    return true;
+  } else {
+    string oldSeq = inMsgs[from]; 
+    string oldRnd = oldSeq;
+    int pos;
+    if ((pos=oldSeq.find_last_of(':'))!=string::npos ) {  
+      oldSeq.erase(0,pos+1);
+      oldRnd.erase(pos,255);
+      long seqNr = lexical_cast<long>(oldSeq);
+      string nextFtId = oldRnd+":"+lexical_cast<string>(seqNr+1);
+      if (nextFtId==ftId) {
+        DLOG(INFO) << "FT: match - got ftId:" << ftId << " expecting " << nextFtId;
+        inMsgs[from] = nextFtId;
+        return true;
+      } else {
+        DLOG(INFO) << "FT: mismatch - got ftId:" << ftId << " expecting " << nextFtId;
+        return false;
+      }
+    } else {
+      DLOG(INFO) << "FT: Error parsing ftId in acceptMessage for ftId:" << ftId;
+      return false;
+    }
+  }
+}
+
+void FTMessaging::setMasterPid(const PID &mPid) {
+  master = mPid;
+}
 
 FTMessaging *FTMessaging::instance = NULL;
 

Modified: incubator/mesos/trunk/src/ft_messaging.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.hpp?rev=1131584&r1=1131583&r2=1131584&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.hpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.hpp Sun Jun  5 03:26:06 2011
@@ -63,9 +63,12 @@ class EmptyClass {
 };
 
 struct StoredMsg {
+
   StoredMsg(const string &_ftId, const string &_data, const MSGID &_id) : 
     ftId(_ftId), data(_data), id(_id), count(0) {}
+
   StoredMsg() : ftId(""), data(""), id(), count(0) {}
+
   string ftId;
   string data;
   MSGID id;
@@ -75,10 +78,31 @@ struct StoredMsg {
 
 class FTMessaging {
 public:
+  /**
+   * @return A singleton instance of this class, the master string needs to be set.
+   * @see setMasterPid()
+   */
   static FTMessaging *getInstance();
+
+  /**
+   * @return A singleton instance of this class.
+   * @param master libprocess PID to current master
+   */
   static FTMessaging *getInstance(PID master);
+
+  /**
+   * @return A singleton instance of this class.
+   * @param masterStr string representing libprocess PID to current master (no nexus:// prefix)
+   */
   static FTMessaging *getInstance(string masterStr);
 
+  /**
+   * Reliably sends a message with a given fault tolerant id 
+   * The message will also be stored in a pending set.
+   * @see getNextId().
+   * @param ftId string representing the unique FT id of the message
+   * @param msgTuple libprocess tuple<ID> 
+   */
   template<MSGID ID> void reliableSend(const string &ftId, const tuple<ID> &msgTuple)
   {
     DLOG(INFO) << "FT: sending " << ftId;
@@ -89,67 +113,37 @@ public:
       DLOG(INFO) << "FT: Not RE-resending due to NULL master PID";
       return;
     }  else
-    Process::post(master, ID, msgStr.data(), msgStr.size());
-  }
-  
-  void gotAck(string ftId) {
-    DLOG(INFO) << "FT: Got ack, deleting outstanding msg " << ftId;
-    outMsgs.erase(ftId);
-  }
-
-  void sendOutstanding() {
-    if (!master) {
-      DLOG(INFO) << "FT: Not RE-resending due to NULL master PID";
-      return;
-    } 
-
-    foreachpair( const string &ftId, struct StoredMsg &msg, outMsgs) {
-      if (msg.count < FT_MAX_RESENDS) {
-        DLOG(INFO) << "FT: RE-sending " << msg.ftId << " attempt:" << msg.count;
-        Process::post(master, msg.id, msg.data.data(), msg.data.size());
-        msg.count++;
-      } else {
-        DLOG(INFO) << "FT: Not RE-sending " << msg.ftId << " reached limit " << FT_MAX_RESENDS;
-        outMsgs.erase(ftId);
-      }
-    }
-  }
-
-  // Careful: not idempotent function.
-  bool acceptMessage(string from, string ftId) {
-    if (inMsgs.find(from)==inMsgs.end()) {
-      DLOG(INFO) << "FT: new msgs seq: " << ftId;
-      inMsgs[from] = ftId;
-      return true;
-    } else {
-      string oldSeq = inMsgs[from]; 
-      string oldRnd = oldSeq;
-      int pos;
-      if ((pos=oldSeq.find_last_of(':'))!=string::npos ) {  
-        oldSeq.erase(0,pos+1);
-        oldRnd.erase(pos,255);
-        long seqNr = lexical_cast<long>(oldSeq);
-        string nextFtId = oldRnd+":"+lexical_cast<string>(seqNr+1);
-        if (nextFtId==ftId) {
-          DLOG(INFO) << "FT: match - got ftId:" << ftId << " expecting " << nextFtId;
-          inMsgs[from] = nextFtId;
-          return true;
-        } else {
-          DLOG(INFO) << "FT: mismatch - got ftId:" << ftId << " expecting " << nextFtId;
-          return false;
-        }
-      } else {
-        DLOG(INFO) << "FT: Error parsing ftId in acceptMessage for ftId:" << ftId;
-        return false;
-      }
-    }
+      Process::post(master, ID, msgStr.data(), msgStr.size());
   }
 
+  /**
+   * Removes any pending message with a given id. This is to be called upon the receipt of a message.
+   * @param ftId string representing the unique FT id of the message.
+   */
+  void gotAck(string ftId);
+
+  /**
+   * Attempts to send all pending messages to the current master. Pending messages are messages that have not been acked yet.
+   */
+  void sendOutstanding();
+
+  /**
+   * Checks if a message with FT ID from a node has already been received previously. 
+   * @param from libprocess PID string representing the original sender of the message
+   * @param ftId the FT ID of the message
+   * @return true if message has not been received before and it is the next message expected to be received, false otherwise.
+   */
+  bool acceptMessage(string from, string ftId);
+
+  /**
+   * @return a new unique FT ID for a message to be sent
+   */ 
   string getNextId();
 
-  void setMasterPid(const PID &mPid) {
-    master = mPid;
-  }
+  /**
+   * Sets the PID to the master (to be called when a new master comes up).
+   */
+  void setMasterPid(const PID &mPid);
 
 private: