You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:26:07 UTC
svn commit: r1131584 - in /incubator/mesos/trunk/src: ft_messaging.cpp
ft_messaging.hpp
Author: benh
Date: Sun Jun 5 03:26:06 2011
New Revision: 1131584
URL: http://svn.apache.org/viewvc?rev=1131584&view=rev
Log:
Moved definitions to cpp file and added doxygen comments to the hpp file
Modified:
incubator/mesos/trunk/src/ft_messaging.cpp
incubator/mesos/trunk/src/ft_messaging.hpp
Modified: incubator/mesos/trunk/src/ft_messaging.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.cpp?rev=1131584&r1=1131583&r2=1131584&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.cpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.cpp Sun Jun 5 03:26:06 2011
@@ -63,6 +63,63 @@ FTMessaging::FTMessaging() :
string FTMessaging::getNextId() {
return uniqPrefix + ":" + lexical_cast<string>(msgId++);
}
+
+void FTMessaging::gotAck(string ftId) {
+ DLOG(INFO) << "FT: Got ack, deleting outstanding msg " << ftId;
+ outMsgs.erase(ftId);
+}
+
+void FTMessaging::sendOutstanding() {
+ if (!master) {
+ DLOG(INFO) << "FT: Not RE-resending due to NULL master PID";
+ return;
+ }
+
+ foreachpair( const string &ftId, struct StoredMsg &msg, outMsgs) {
+ if (msg.count < FT_MAX_RESENDS) {
+ DLOG(INFO) << "FT: RE-sending " << msg.ftId << " attempt:" << msg.count;
+ Process::post(master, msg.id, msg.data.data(), msg.data.size());
+ msg.count++;
+ } else {
+ DLOG(INFO) << "FT: Not RE-sending " << msg.ftId << " reached limit " << FT_MAX_RESENDS;
+ outMsgs.erase(ftId);
+ }
+ }
+}
+
+// Careful: not idempotent function.
+bool FTMessaging::acceptMessage(string from, string ftId) {
+ if (inMsgs.find(from)==inMsgs.end()) {
+ DLOG(INFO) << "FT: new msgs seq: " << ftId;
+ inMsgs[from] = ftId;
+ return true;
+ } else {
+ string oldSeq = inMsgs[from];
+ string oldRnd = oldSeq;
+ int pos;
+ if ((pos=oldSeq.find_last_of(':'))!=string::npos ) {
+ oldSeq.erase(0,pos+1);
+ oldRnd.erase(pos,255);
+ long seqNr = lexical_cast<long>(oldSeq);
+ string nextFtId = oldRnd+":"+lexical_cast<string>(seqNr+1);
+ if (nextFtId==ftId) {
+ DLOG(INFO) << "FT: match - got ftId:" << ftId << " expecting " << nextFtId;
+ inMsgs[from] = nextFtId;
+ return true;
+ } else {
+ DLOG(INFO) << "FT: mismatch - got ftId:" << ftId << " expecting " << nextFtId;
+ return false;
+ }
+ } else {
+ DLOG(INFO) << "FT: Error parsing ftId in acceptMessage for ftId:" << ftId;
+ return false;
+ }
+ }
+}
+
+void FTMessaging::setMasterPid(const PID &mPid) {
+ master = mPid;
+}
FTMessaging *FTMessaging::instance = NULL;
Modified: incubator/mesos/trunk/src/ft_messaging.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.hpp?rev=1131584&r1=1131583&r2=1131584&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.hpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.hpp Sun Jun 5 03:26:06 2011
@@ -63,9 +63,12 @@ class EmptyClass {
};
struct StoredMsg {
+
StoredMsg(const string &_ftId, const string &_data, const MSGID &_id) :
ftId(_ftId), data(_data), id(_id), count(0) {}
+
StoredMsg() : ftId(""), data(""), id(), count(0) {}
+
string ftId;
string data;
MSGID id;
@@ -75,10 +78,31 @@ struct StoredMsg {
class FTMessaging {
public:
+ /**
+ * @return A singleton instance of this class, the master string needs to be set.
+ * @see setMasterPid()
+ */
static FTMessaging *getInstance();
+
+ /**
+ * @return A singleton instance of this class.
+ * @param master libprocess PID to current master
+ */
static FTMessaging *getInstance(PID master);
+
+ /**
+ * @return A singleton instance of this class.
+ * @param masterStr string representing libprocess PID to current master (no nexus:// prefix)
+ */
static FTMessaging *getInstance(string masterStr);
+ /**
+ * Reliably sends a message with a given fault tolerant id
+ * The message will also be stored in a pending set.
+ * @see getNextId().
+ * @param ftId string representing the unique FT id of the message
+ * @param msgTuple libprocess tuple<ID>
+ */
template<MSGID ID> void reliableSend(const string &ftId, const tuple<ID> &msgTuple)
{
DLOG(INFO) << "FT: sending " << ftId;
@@ -89,67 +113,37 @@ public:
DLOG(INFO) << "FT: Not RE-resending due to NULL master PID";
return;
} else
- Process::post(master, ID, msgStr.data(), msgStr.size());
- }
-
- void gotAck(string ftId) {
- DLOG(INFO) << "FT: Got ack, deleting outstanding msg " << ftId;
- outMsgs.erase(ftId);
- }
-
- void sendOutstanding() {
- if (!master) {
- DLOG(INFO) << "FT: Not RE-resending due to NULL master PID";
- return;
- }
-
- foreachpair( const string &ftId, struct StoredMsg &msg, outMsgs) {
- if (msg.count < FT_MAX_RESENDS) {
- DLOG(INFO) << "FT: RE-sending " << msg.ftId << " attempt:" << msg.count;
- Process::post(master, msg.id, msg.data.data(), msg.data.size());
- msg.count++;
- } else {
- DLOG(INFO) << "FT: Not RE-sending " << msg.ftId << " reached limit " << FT_MAX_RESENDS;
- outMsgs.erase(ftId);
- }
- }
- }
-
- // Careful: not idempotent function.
- bool acceptMessage(string from, string ftId) {
- if (inMsgs.find(from)==inMsgs.end()) {
- DLOG(INFO) << "FT: new msgs seq: " << ftId;
- inMsgs[from] = ftId;
- return true;
- } else {
- string oldSeq = inMsgs[from];
- string oldRnd = oldSeq;
- int pos;
- if ((pos=oldSeq.find_last_of(':'))!=string::npos ) {
- oldSeq.erase(0,pos+1);
- oldRnd.erase(pos,255);
- long seqNr = lexical_cast<long>(oldSeq);
- string nextFtId = oldRnd+":"+lexical_cast<string>(seqNr+1);
- if (nextFtId==ftId) {
- DLOG(INFO) << "FT: match - got ftId:" << ftId << " expecting " << nextFtId;
- inMsgs[from] = nextFtId;
- return true;
- } else {
- DLOG(INFO) << "FT: mismatch - got ftId:" << ftId << " expecting " << nextFtId;
- return false;
- }
- } else {
- DLOG(INFO) << "FT: Error parsing ftId in acceptMessage for ftId:" << ftId;
- return false;
- }
- }
+ Process::post(master, ID, msgStr.data(), msgStr.size());
}
+ /**
+ * Removes any pending message with a given id. This is to be called upon the receipt of a message.
+ * @param ftId string representing the unique FT id of the message.
+ */
+ void gotAck(string ftId);
+
+ /**
+ * Attempts to send all pending messages to the current master. Pending messages are messages that have not been acked yet.
+ */
+ void sendOutstanding();
+
+ /**
+ * Checks if a message with FT ID from a node has already been received previously.
+ * @param from libprocess PID string representing the original sender of the message
+ * @param ftId the FT ID of the message
+ * @return true if message has not been received before and it is the next message expected to be received, false otherwise.
+ */
+ bool acceptMessage(string from, string ftId);
+
+ /**
+ * @return a new unique FT ID for a message to be sent
+ */
string getNextId();
- void setMasterPid(const PID &mPid) {
- master = mPid;
- }
+ /**
+ * Sets the PID to the master (to be called when a new master comes up).
+ */
+ void setMasterPid(const PID &mPid);
private: