You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:35:57 UTC
svn commit: r1131809 - in /incubator/mesos/trunk/src/third_party/libprocess:
reliable.cpp reliable.hpp tuple-impl.hpp
Author: benh
Date: Sun Jun 5 05:35:56 2011
New Revision: 1131809
URL: http://svn.apache.org/viewvc?rev=1131809&view=rev
Log:
Updates to reliable process to do the correct thing.
Modified:
incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp
incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp
incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp
Modified: incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp?rev=1131809&r1=1131808&r2=1131809&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp Sun Jun 5 05:35:56 2011
@@ -41,6 +41,14 @@ public:
ReliableSender(struct rmsg *_rmsg)
: rmsg(_rmsg) {}
+ ~ReliableSender()
+ {
+ if (rmsg != NULL) {
+ free(rmsg);
+ rmsg = NULL;
+ }
+ }
+
protected:
void operator () ()
{
@@ -128,7 +136,8 @@ PID ReliableProcess::origin() const
void ReliableProcess::ack()
{
if (current != NULL)
- send(current->msg.from, RELIABLE_ACK);
+ send(current->msg.from, RELIABLE_ACK, (char *) current,
+ sizeof(struct rmsg) + current->msg.len);
}
@@ -176,10 +185,12 @@ MSGID ReliableProcess::receive(double se
if (current != NULL) {
// TODO(benh): Since we ignore out-of-order messages right now, we
// can be sure that the current message is the next in the
- // sequence (unless it's the first message).
- assert((recvSeqs.find(current->msg.from) == recvSeqs.end()) ||
- (recvSeqs[current->msg.from] + 1 == current->seq));
- recvSeqs[current->msg.from] = current->seq;
+ // sequence (unless it's the first message or a duplicate).
+ if (!duplicate()) {
+ assert((recvSeqs.find(current->msg.from) == recvSeqs.end()) ||
+ (recvSeqs[current->msg.from] + 1 == current->seq));
+ recvSeqs[current->msg.from] = current->seq;
+ }
free(current);
current = NULL;
}
@@ -187,6 +198,24 @@ MSGID ReliableProcess::receive(double se
do {
MSGID id = Process::receive(secs);
switch (id) {
+ // TODO(benh): Better validation of messages!
+ case RELIABLE_ACK: {
+ size_t length;
+ const char *data = body(&length);
+ assert(length > 0);
+ struct rmsg *rmsg = (struct rmsg *) data;
+
+ // TODO(benh): Is this really the way we want to do acks?
+ foreachpair (const PID &pid, ReliableSender *sender, senders) {
+ assert(pid == sender->getPID());
+ // TODO(benh): Don't look into sender's class like this ... HACK!
+ if (rmsg->seq == sender->rmsg->seq &&
+ rmsg->msg.to == sender->rmsg->msg.to) {
+ send(pid, RELIABLE_ACK);
+ }
+ }
+ continue;
+ }
case RELIABLE_MSG: {
size_t length;
const char *data = body(&length);
@@ -206,7 +235,9 @@ MSGID ReliableProcess::receive(double se
inject(current->msg.from, current->msg.id,
data + sizeof(struct rmsg), current->msg.len);
- return Process::receive();
+
+ // Avoid recursively invoking ourselves via receive(), use receive(0)!
+ return Process::receive(0);
}
case PROCESS_EXIT: {
if (senders.find(from()) != senders.end()) {
Modified: incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp?rev=1131809&r1=1131808&r2=1131809&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp Sun Jun 5 05:35:56 2011
@@ -73,6 +73,9 @@ protected:
*/
virtual void rsend(const PID &to, MSGID id, const char *data, size_t length);
+ /* Blocks for message indefinitely. */
+ virtual MSGID receive();
+
/* Blocks for message at most specified seconds. */
virtual MSGID receive(double secs);
@@ -97,4 +100,9 @@ inline void ReliableProcess::rsend(const
}
+inline MSGID ReliableProcess::receive()
+{
+ return receive(0);
+}
+
#endif /* __RELIABLE_HPP__ */
Modified: incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp?rev=1131809&r1=1131808&r2=1131809&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp Sun Jun 5 05:35:56 2011
@@ -580,7 +580,7 @@ protected:
MSGID receive()
{
- return receive(0);
+ return P::receive();
}
MSGID receive(double secs)