You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 10:41:26 UTC
svn commit: r1132101 - in /incubator/mesos/trunk: src/messaging/messages.hpp
src/sched/sched.cpp src/slave/slave.cpp third_party/libprocess/reliable.cpp
third_party/libprocess/reliable.hpp
Author: benh
Date: Sun Jun 5 08:41:25 2011
New Revision: 1132101
URL: http://svn.apache.org/viewvc?rev=1132101&view=rev
Log:
Fixed serious bug with reliable messaging that effectively made a system for sharing the cluster a system for running only one thing on the cluster at a time. :| Oops. For posterity, see commit edd68032cf56177c3a5c157858782ea31921689f for a reference to when I introduced this bug unknowningly.
(cherry picked from commit f4080a9bac48f0b416b73fd0a46f6785f2bc8cb1)
Modified:
incubator/mesos/trunk/src/messaging/messages.hpp
incubator/mesos/trunk/src/sched/sched.cpp
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/third_party/libprocess/reliable.cpp
incubator/mesos/trunk/third_party/libprocess/reliable.hpp
Modified: incubator/mesos/trunk/src/messaging/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.hpp?rev=1132101&r1=1132100&r2=1132101&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.hpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.hpp Sun Jun 5 08:41:25 2011
@@ -164,6 +164,13 @@ protected:
return ReliableProcess::rsend(to, ID, data.data(), data.size());
}
+ template <MSGID ID>
+ int rsend(const PID &via, const PID &to, const tuple<ID> &t)
+ {
+ const std::string &data = MESOS_MESSAGING_VERSION + "|" + std::string(t);
+ return ReliableProcess::rsend(via, to, ID, data.data(), data.size());
+ }
+
virtual MSGID receive() { return receive(0); }
virtual MSGID receive(double secs)
Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1132101&r1=1132100&r2=1132101&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Sun Jun 5 08:41:25 2011
@@ -77,14 +77,14 @@ protected:
case PROCESS_TIMEOUT: {
terminate = true;
- VLOG(1) << "FT: faking M2F_STATUS_UPDATE due to ReplyToOffer timeout for tid:" << tid;
+ VLOG(1) << "No status updates received for tid:" << tid
+ << ". Assuming task was lost.";
send(parent, pack<M2F_STATUS_UPDATE>(tid, TASK_LOST, ""));
break;
}
}
}
- VLOG(1) << "FT: Exiting reliable reply for tid:" << tid;
}
private:
@@ -178,7 +178,7 @@ protected:
PID masterPid;
tie(masterSeq, masterPid) = unpack<NEW_MASTER_DETECTED>(body());
- LOG(INFO) << "New master at " << masterPid << " with ID:" << masterSeq;
+ VLOG(1) << "New master at " << masterPid << " with ID:" << masterSeq;
redirect(master, masterPid);
master = masterPid;
@@ -283,10 +283,12 @@ protected:
tie(sid, fid, tid, state, data) = unpack<S2M_FT_STATUS_UPDATE>(body());
- if (duplicate())
+ if (duplicate()) {
+ VLOG(1) << "Received a duplicate status update for tid " << tid
+ << ", status = " << state;
break;
+ }
ack();
- VLOG(1) << "FT: Received message with id: " << seq();
unordered_map <TaskID, RbReply *>::iterator it = rbReplies.find(tid);
if (it != rbReplies.end()) {
@@ -347,7 +349,7 @@ protected:
case PROCESS_EXIT: {
// TODO(benh): Don't wait for a new master forever.
- LOG(WARNING) << "Connection to master lost .. waiting for new master.";
+ VLOG(1) << "Connection to master lost .. waiting for new master.";
break;
}
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132101&r1=1132100&r2=1132101&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun 5 08:41:25 2011
@@ -343,20 +343,28 @@ void Slave::operator () ()
TaskState taskState;
string data;
tie(fid, tid, taskState, data) = unpack<E2S_STATUS_UPDATE>(body());
- LOG(INFO) << "Got status update for task " << fid << ":" << tid;
- if (taskState == TASK_FINISHED || taskState == TASK_FAILED ||
- taskState == TASK_KILLED || taskState == TASK_LOST) {
- LOG(INFO) << "Task " << fid << ":" << tid << " done";
- if (Framework *fw = getFramework(fid)) {
- fw->removeTask(tid);
- isolationModule->resourcesChanged(fw);
+
+ Framework *framework = getFramework(fid);
+ if (framework != NULL) {
+ LOG(INFO) << "Got status update for task " << fid << ":" << tid;
+ if (taskState == TASK_FINISHED || taskState == TASK_FAILED ||
+ taskState == TASK_KILLED || taskState == TASK_LOST) {
+ LOG(INFO) << "Task " << fid << ":" << tid << " done";
+
+ framework->removeTask(tid);
+ isolationModule->resourcesChanged(framework);
}
- }
- // Reliably send message and save sequence number for canceling later.
- int seq = rsend(master, pack<S2M_FT_STATUS_UPDATE>(id, fid, tid,
- taskState, data));
- seqs[fid].insert(seq);
+ // Reliably send message and save sequence number for
+ // canceling later.
+ int seq = rsend(master, framework->pid,
+ pack<S2M_FT_STATUS_UPDATE>(id, fid, tid,
+ taskState, data));
+ seqs[fid].insert(seq);
+ } else {
+ LOG(WARNING) << "Got status update for UNKNOWN task "
+ << fid << ":" << tid;
+ }
break;
}
Modified: incubator/mesos/trunk/third_party/libprocess/reliable.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/reliable.cpp?rev=1132101&r1=1132100&r2=1132101&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/reliable.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/reliable.cpp Sun Jun 5 08:41:25 2011
@@ -4,7 +4,9 @@
#include "foreach.hpp"
#include "reliable.hpp"
+using std::make_pair;
using std::map;
+using std::pair;
#define malloc(bytes) \
({ void *tmp; \
@@ -34,10 +36,11 @@ struct rmsg
class ReliableSender : public Process
{
public:
+ PID via;
struct rmsg *rmsg;
- ReliableSender(struct rmsg *_rmsg)
- : rmsg(_rmsg) {}
+ ReliableSender(const PID &_via, struct rmsg *_rmsg)
+ : via(_via), rmsg(_rmsg) {}
~ReliableSender()
{
@@ -51,7 +54,7 @@ protected:
void operator () ()
{
do {
- send(rmsg->msg.to, RELIABLE_MSG, (char *) rmsg,
+ send(via, RELIABLE_MSG, (char *) rmsg,
sizeof(struct rmsg) + rmsg->msg.len);
switch (receive(RELIABLE_TIMEOUT)) {
@@ -59,7 +62,11 @@ protected:
// All done!
return;
}
- case RELIABLE_REDIRECT: {
+ case RELIABLE_REDIRECT_VIA: {
+ via = *reinterpret_cast<const PID *>(body(NULL));
+ break;
+ }
+ case RELIABLE_REDIRECT_TO: {
rmsg->msg.to = *reinterpret_cast<const PID *>(body(NULL));
break;
}
@@ -74,7 +81,7 @@ protected:
ReliableProcess::ReliableProcess()
- : current(NULL), nextSeq(0) {}
+ : current(NULL) {}
ReliableProcess::~ReliableProcess()
@@ -110,9 +117,11 @@ bool ReliableProcess::duplicate() const
// greater than the last one we saw. Note that we don't add the
// sequence identifier for the current message until the next
// 'receive' invocation (see below).
- if (current != NULL)
- if (recvSeqs.count(current->msg.from) > 0)
- return current->seq <= recvSeqs.find(current->msg.from)->second;
+ if (current != NULL) {
+ pair<PID, PID> from_to = make_pair(current->msg.from, current->msg.to);
+ if (recvSeqs.count(from_to) > 0)
+ return current->seq <= recvSeqs.find(from_to)->second;
+ }
return false;
}
@@ -123,7 +132,16 @@ PID ReliableProcess::origin() const
if (current != NULL)
return current->msg.from;
- return PID();
+ return from();
+}
+
+
+PID ReliableProcess::destination() const
+{
+ if (current != NULL)
+ return current->msg.to;
+
+ return self();
}
@@ -135,10 +153,10 @@ void ReliableProcess::ack()
}
-bool ReliableProcess::forward(const PID &to)
+bool ReliableProcess::forward(const PID &via)
{
if (current != NULL) {
- send(to, RELIABLE_MSG, (char *) current,
+ send(via, RELIABLE_MSG, (char *) current,
sizeof(struct rmsg) + current->msg.len);
return true;
}
@@ -152,7 +170,36 @@ int ReliableProcess::rsend(const PID &to
// Allocate/Initialize outgoing message.
struct rmsg *rmsg = (struct rmsg *) malloc(sizeof(struct rmsg) + length);
- int seq = nextSeq++;
+ int seq = sentSeqs[to]++;
+
+ rmsg->seq = seq;
+
+ rmsg->msg.from.pipe = self().pipe;
+ rmsg->msg.from.ip = self().ip;
+ rmsg->msg.from.port = self().port;
+ rmsg->msg.to.pipe = to.pipe;
+ rmsg->msg.to.ip = to.ip;
+ rmsg->msg.to.port = to.port;
+ rmsg->msg.id = id;
+ rmsg->msg.len = length;
+
+ if (length > 0)
+ memcpy((char *) rmsg + sizeof(struct rmsg), data, length);
+
+ ReliableSender *sender = new ReliableSender(to, rmsg);
+ PID pid = link(spawn(sender));
+ senders[pid] = sender;
+
+ return seq;
+}
+
+
+int ReliableProcess::rsend(const PID &via, const PID &to, MSGID id, const char *data, size_t length)
+{
+ // Allocate/Initialize outgoing message.
+ struct rmsg *rmsg = (struct rmsg *) malloc(sizeof(struct rmsg) + length);
+
+ int seq = sentSeqs[to]++;
rmsg->seq = seq;
@@ -168,7 +215,7 @@ int ReliableProcess::rsend(const PID &to
if (length > 0)
memcpy((char *) rmsg + sizeof(struct rmsg), data, length);
- ReliableSender *sender = new ReliableSender(rmsg);
+ ReliableSender *sender = new ReliableSender(via, rmsg);
PID pid = link(spawn(sender));
senders[pid] = sender;
@@ -185,9 +232,10 @@ MSGID ReliableProcess::receive(double se
// can be sure that the current message is the next in the
// sequence (unless it's the first message or a duplicate).
if (!duplicate()) {
- assert((recvSeqs.count(current->msg.from) == 0) ||
- (recvSeqs[current->msg.from] + 1 == current->seq));
- recvSeqs[current->msg.from] = current->seq;
+ pair<PID, PID> from_to = make_pair(current->msg.from, current->msg.to);
+ assert((recvSeqs.count(from_to) == 0) ||
+ (recvSeqs[from_to] + 1 == current->seq));
+ recvSeqs[from_to] = current->seq;
}
free(current);
current = NULL;
@@ -222,8 +270,9 @@ MSGID ReliableProcess::receive(double se
memcpy((char *) current, data, length);
// TODO(benh): Don't ignore out-of-order messages!
- if (recvSeqs.count(current->msg.from) > 0 &&
- recvSeqs[current->msg.from] + 1 < current->seq) {
+ pair<PID, PID> from_to = make_pair(current->msg.from, current->msg.to);
+ if (recvSeqs.count(from_to) > 0 &&
+ recvSeqs[from_to] + 1 < current->seq) {
free(current);
current = NULL;
continue;
@@ -261,8 +310,10 @@ void ReliableProcess::redirect(const PID
foreachpair (const PID &pid, ReliableSender *sender, senders) {
assert(pid == sender->self());
// TODO(benh): Don't look into sender's class like this ... HACK!
+ if (existing == sender->via)
+ send(pid, RELIABLE_REDIRECT_VIA, (char *) &updated, sizeof(PID));
if (existing == sender->rmsg->msg.to)
- send(pid, RELIABLE_REDIRECT, (char *) &updated, sizeof(PID));
+ send(pid, RELIABLE_REDIRECT_TO, (char *) &updated, sizeof(PID));
}
}
Modified: incubator/mesos/trunk/third_party/libprocess/reliable.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/reliable.hpp?rev=1132101&r1=1132100&r2=1132101&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/reliable.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/reliable.hpp Sun Jun 5 08:41:25 2011
@@ -3,6 +3,7 @@
#include <process.hpp>
+#include <functional>
#include <map>
#define RELIABLE_TIMEOUT 10
@@ -11,7 +12,8 @@
enum {
RELIABLE_MSG = PROCESS_MSGID,
RELIABLE_ACK,
- RELIABLE_REDIRECT,
+ RELIABLE_REDIRECT_VIA,
+ RELIABLE_REDIRECT_TO,
RELIABLE_MSGID
};
@@ -41,11 +43,18 @@ protected:
virtual bool duplicate() const;
/**
- * @return origin of current message, or equal to @see Process::from().
+ * @return origin of current message (if current message is not
+ * reliable this returns Process::from()).
*/
virtual PID origin() const;
/**
+ * @return destination of current message (if current message is not
+ * reliable this returns Process::self()).
+ */
+ virtual PID destination() const;
+
+ /**
* Acknowledges the current message by sending an 'ack' back to the
* origin, or does nothing if the current message is not _reliable_.
*/
@@ -53,11 +62,11 @@ protected:
/**
* Forward current message (provided it is _reliable_).
- * @param to destination
+ * @param via hop (or possibly destination)
* @return false if the current message is not _reliable_, true
* otherwise.
*/
- virtual bool forward(const PID &to);
+ virtual bool forward(const PID &via);
/**
* Sends a _reliable_ message to PID.
@@ -68,6 +77,16 @@ protected:
virtual int rsend(const PID &to, MSGID id);
/**
+ * Sends a _reliable_ message via another PID (meant to be
+ * forwarded).
+ * @param via hop
+ * @param to destination
+ * @param id message id
+ * @return sequence number of message
+ */
+ virtual int rsend(const PID &via, const PID &to, MSGID id);
+
+ /**
* Sends a _reliable_ message with data to PID.
* @param to destination
* @param id message id
@@ -77,6 +96,19 @@ protected:
*/
virtual int rsend(const PID &to, MSGID id, const char *data, size_t length);
+ /**
+ * Sends a _reliable_ message with data via another process (meant
+ * to be forwarded).
+ * @param via hop
+ * @param to destination
+ * @param id message id
+ * @param data payload
+ * @param length payload length
+ * @return sequence number of message
+ */
+ virtual int rsend(const PID &via, const PID &to, MSGID id, const char *data, size_t length);
+
+
/* Blocks for message indefinitely. */
virtual MSGID receive();
@@ -99,8 +131,8 @@ protected:
private:
struct rmsg *current;
- int nextSeq;
- std::map<PID, int> recvSeqs;
+ std::map<PID, int> sentSeqs;
+ std::map<std::pair<PID, PID>, int> recvSeqs;
std::map<PID, ReliableSender *> senders;
};
@@ -111,6 +143,12 @@ inline int ReliableProcess::rsend(const
}
+inline int ReliableProcess::rsend(const PID &via, const PID &to, MSGID id)
+{
+ return rsend(via, to, id, NULL, 0);
+}
+
+
inline MSGID ReliableProcess::receive()
{
return receive(0);