You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:43:16 UTC
svn commit: r1131873 - in /incubator/mesos/trunk/src: slave.cpp slave.hpp
third_party/libprocess/reliable.cpp third_party/libprocess/reliable.hpp
Author: benh
Date: Sun Jun 5 05:43:16 2011
New Revision: 1131873
URL: http://svn.apache.org/viewvc?rev=1131873&view=rev
Log:
Added the ability to cancel a reliable message (so that it is not continually retried). Closes #65.
Modified:
incubator/mesos/trunk/src/slave.cpp
incubator/mesos/trunk/src/slave.hpp
incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp
incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp
Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131873&r1=1131872&r2=1131873&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun 5 05:43:16 2011
@@ -306,7 +306,10 @@ void Slave::operator () ()
const string &msg =
tupleToString(pack<S2M_FT_STATUS_UPDATE>(id, fid, tid, taskState,
data));
- rsend(master, S2M_FT_STATUS_UPDATE, msg.data(), msg.size());
+
+ // Reliably send message and save sequence number for canceling later.
+ int seq = rsend(master, S2M_FT_STATUS_UPDATE, msg.data(), msg.size());
+ seqs[fid].insert(seq);
break;
}
@@ -434,7 +437,16 @@ void Slave::removeExecutor(FrameworkID f
void Slave::killFramework(Framework *fw)
{
LOG(INFO) << "Cleaning up framework " << fw->id;
+
+ // Cancel sending any reliable messages for this framework.
+ foreach (int seq, seqs[fw->id])
+ cancel(seq);
+
+ seqs.erase(fw->id);
+
+ // Remove its allocated resources.
fw->resources = Resources();
+
// If an executor is running, tell it to exit and kill it
if (Executor *ex = getExecutor(fw->id)) {
send(ex->pid, pack<S2E_KILL_EXECUTOR>());
Modified: incubator/mesos/trunk/src/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.hpp?rev=1131873&r1=1131872&r2=1131873&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave.hpp Sun Jun 5 05:43:16 2011
@@ -169,6 +169,9 @@ public:
ExecutorMap executors; // Invariant: framework will exist if executor exists
IsolationModule *isolationModule;
+ // Sequence numbers of reliable messages sent on behalf of framework.
+ unordered_map<FrameworkID, unordered_set<int> > seqs;
+
public:
Slave(Resources resources, bool local, IsolationModule *isolationModule);
Modified: incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp?rev=1131873&r1=1131872&r2=1131873&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/reliable.cpp Sun Jun 5 05:43:16 2011
@@ -73,7 +73,8 @@ protected:
};
-ReliableProcess::ReliableProcess() : current(NULL) {}
+ReliableProcess::ReliableProcess()
+ : current(NULL), nextSeq(0) {}
ReliableProcess::~ReliableProcess()
@@ -151,12 +152,14 @@ bool ReliableProcess::forward(const PID
}
-void ReliableProcess::rsend(const PID &to, MSGID id, const char *data, size_t length)
+int ReliableProcess::rsend(const PID &to, MSGID id, const char *data, size_t length)
{
// Allocate/Initialize outgoing message.
struct rmsg *rmsg = (struct rmsg *) malloc(sizeof(struct rmsg) + length);
- rmsg->seq = sentSeqs[to]++;
+ int seq = nextSeq++;
+
+ rmsg->seq = seq;
rmsg->msg.from.pipe = self().pipe;
rmsg->msg.from.ip = self().ip;
@@ -173,6 +176,8 @@ void ReliableProcess::rsend(const PID &t
ReliableSender *sender = new ReliableSender(rmsg);
PID pid = link(spawn(sender));
senders[pid] = sender;
+
+ return seq;
}
@@ -262,3 +267,18 @@ void ReliableProcess::redirect(const PID
send(pid, RELIABLE_REDIRECT, (char *) &updated, sizeof(PID));
}
}
+
+
+void ReliableProcess::cancel(int seq)
+{
+ foreachpair (const PID &pid, ReliableSender *sender, senders) {
+ assert(pid == sender->getPID());
+ // Shut it down by sending it an ack. It will get cleaned up via
+ // the PROCESS_EXIT above.
+ // TODO(benh): Don't look into sender's class like this ... HACK!
+ if (seq == sender->rmsg->seq) {
+ send(pid, RELIABLE_ACK);
+ break;
+ }
+ }
+}
Modified: incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp?rev=1131873&r1=1131872&r2=1131873&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/reliable.hpp Sun Jun 5 05:43:16 2011
@@ -63,8 +63,9 @@ protected:
* Sends a _reliable_ message to PID.
* @param to destination
* @param id message id
+ * @return sequence number of message
*/
- virtual void rsend(const PID &to, MSGID id);
+ virtual int rsend(const PID &to, MSGID id);
/**
* Sends a _reliable_ message with data to PID.
@@ -72,8 +73,9 @@ protected:
* @param id message id
* @param data payload
* @param length payload length
+ * @return sequence number of message
*/
- virtual void rsend(const PID &to, MSGID id, const char *data, size_t length);
+ virtual int rsend(const PID &to, MSGID id, const char *data, size_t length);
/* Blocks for message indefinitely. */
virtual MSGID receive();
@@ -87,18 +89,25 @@ protected:
* @param updated the new PID
*/
virtual void redirect(const PID &existing, const PID &updated);
+
+ /**
+ * Cancel trying to reliably send the message with the specified
+ * sequence number.
+ * @param seq sequence number of message to cancel
+ */
+ virtual void cancel(int seq);
private:
struct rmsg *current;
- std::map<PID, int> sentSeqs;
+ int nextSeq;
std::map<PID, int> recvSeqs;
std::map<PID, ReliableSender *> senders;
};
-inline void ReliableProcess::rsend(const PID &to, MSGID id)
+inline int ReliableProcess::rsend(const PID &to, MSGID id)
{
- rsend(to, id, NULL, 0);
+ return rsend(to, id, NULL, 0);
}