You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:26:48 UTC
svn commit: r1131762 - in /incubator/mesos/trunk/src: ft_messaging.cpp
ft_messaging.hpp master.cpp messages.hpp nexus_sched.cpp
Author: benh
Date: Sun Jun 5 05:26:48 2011
New Revision: 1131762
URL: http://svn.apache.org/viewvc?rev=1131762&view=rev
Log:
Redesigned to handle timeouts for ReplyToOffers. Needs to be tested still.
Modified:
incubator/mesos/trunk/src/ft_messaging.cpp
incubator/mesos/trunk/src/ft_messaging.hpp
incubator/mesos/trunk/src/master.cpp
incubator/mesos/trunk/src/messages.hpp
incubator/mesos/trunk/src/nexus_sched.cpp
Modified: incubator/mesos/trunk/src/ft_messaging.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.cpp?rev=1131762&r1=1131761&r2=1131762&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.cpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.cpp Sun Jun 5 05:26:48 2011
@@ -70,8 +70,6 @@ void FTMessaging::gotAck(const string &f
void FTMessaging::deleteMessage(const string &ftId) {
struct FTStoredMsg &msg = outMsgs[ftId];
- if (msg.callback != NULL)
- delete msg.callback; // ugly and sad. shared_ptr would have been better
outMsgs.erase(ftId);
}
@@ -82,11 +80,7 @@ void FTMessaging::sendOutstanding() {
}
foreachpair(const string &ftId, struct FTStoredMsg &msg, outMsgs) {
- if (msg.callback != NULL) {
- DLOG(INFO) << "FT: calling timeout listener";
- msg.callback->timeout();
- deleteMessage(ftId);
- } else if (msg.count < FT_MAX_RESENDS) {
+ if (msg.count < FT_MAX_RESENDS) {
DLOG(INFO) << "FT: RE-sending " << msg.ftId << " attempt:" << msg.count;
Process::post(master, msg.id, msg.data.data(), msg.data.size());
msg.count++;
Modified: incubator/mesos/trunk/src/ft_messaging.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.hpp?rev=1131762&r1=1131761&r2=1131762&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.hpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.hpp Sun Jun 5 05:26:48 2011
@@ -29,33 +29,21 @@ class EmptyClass {
/**
- * Interface used to signal that a message has timed out.
- */
-class FTCallback {
-public:
- /**
- * Called from sendOutstanding() if a message has a callback object and it has timed out.
- */
- virtual void timeout() = 0;
-};
-
-/**
* Used in FTMessaging to store unacked messages, their count, ftId, libprocess id.
* @see FTMessaging
*/
struct FTStoredMsg {
- FTStoredMsg(const string &_ftId, const string &_data, const MSGID &_id, FTCallback *_cb = NULL) :
- ftId(_ftId), data(_data), id(_id), count(1), ts(time(0)), callback(_cb) {}
+ FTStoredMsg(const string &_ftId, const string &_data, const MSGID &_id) :
+ ftId(_ftId), data(_data), id(_id), count(1), ts(time(0)) {}
- FTStoredMsg(bool _cb = false) : ftId(""), data(""), id(), count(1), ts(time(0)), callback(NULL) {}
+ FTStoredMsg(bool _cb = false) : ftId(""), data(""), id(), count(1), ts(time(0)) {}
string ftId;
string data;
MSGID id;
long count;
time_t ts; // not currently used
- FTCallback *callback;
};
/**
@@ -88,14 +76,13 @@ public:
* @see getNextId().
* @param ftId string representing the unique FT id of the message
* @param msgTuple libprocess tuple<ID>
- * @param FTCallback if not null, then FTCallback will be called by sendOutstanding()
*/
- template<MSGID ID> void reliableSend(const string &ftId, const tuple<ID> &msgTuple, FTCallback *callback = NULL)
+ template<MSGID ID> void reliableSend(const string &ftId, const tuple<ID> &msgTuple)
{
DLOG(INFO) << "FT: sending " << ftId;
string msgStr = Tuple<EmptyClass>::tupleToString(msgTuple);
- outMsgs[ftId] = FTStoredMsg(ftId, msgStr, ID, callback);
+ outMsgs[ftId] = FTStoredMsg(ftId, msgStr, ID);
if (!master) {
DLOG(INFO) << "FT: Not RE-resending due to NULL master PID";
return;
Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131762&r1=1131761&r2=1131762&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun 5 05:26:48 2011
@@ -395,39 +395,6 @@ void Master::operator () ()
break;
}
- case F2M_FT_SLOT_OFFER_REPLY: {
- FrameworkID fid;
- OfferID oid;
- vector<TaskDescription> tasks;
- Params params;
- string ftId, senderStr;
- unpack<F2M_FT_SLOT_OFFER_REPLY>(ftId, senderStr, fid, oid, tasks, params);
- PID senderPid = make_pid(senderStr.c_str());
- if (!senderPid) {
- LOG(ERROR) << "Couldn't create PID out of sender string";
- }
- if (!ftMsg->acceptMessageAckTo(senderPid, ftId, senderStr)) {
- LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << ftId;
- break;
- }
- Framework *framework = lookupFramework(fid);
- if (framework != NULL) {
- SlotOffer *offer = lookupSlotOffer(oid);
- if (offer != NULL) {
- processOfferReply(offer, tasks, params);
- } else {
- // The slot offer is gone, meaning that we rescinded it or that
- // the slave was lost; immediately report any tasks in it as lost
- foreach (TaskDescription &t, tasks) {
- send(framework->pid,
- pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
- }
- }
- } else
- DLOG(INFO) << "F2M_FT_SLOT_OFFER_REPLY error: couldn't lookup framework id" << fid;
- break;
- }
-
case F2M_SLOT_OFFER_REPLY: {
FrameworkID fid;
OfferID oid;
Modified: incubator/mesos/trunk/src/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.hpp?rev=1131762&r1=1131761&r2=1131762&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.hpp (original)
+++ incubator/mesos/trunk/src/messages.hpp Sun Jun 5 05:26:48 2011
@@ -24,7 +24,6 @@ enum MessageType {
F2M_REREGISTER_FRAMEWORK,
F2M_UNREGISTER_FRAMEWORK,
F2M_SLOT_OFFER_REPLY,
- F2M_FT_SLOT_OFFER_REPLY,
F2M_REVIVE_OFFERS,
F2M_KILL_TASK,
F2M_FRAMEWORK_MESSAGE,
@@ -32,6 +31,7 @@ enum MessageType {
F2F_SLOT_OFFER_REPLY,
F2F_FRAMEWORK_MESSAGE,
+ F2F_TASK_RUNNING_STATUS,
/* From master to framework. */
M2F_REGISTER_REPLY,
@@ -144,14 +144,6 @@ TUPLE(F2M_SLOT_OFFER_REPLY,
std::vector<TaskDescription>,
Params));
-TUPLE(F2M_FT_SLOT_OFFER_REPLY,
- (std::string, /* FT ID */
- std::string, /* original sender */
- FrameworkID,
- OfferID,
- std::vector<TaskDescription>,
- Params));
-
TUPLE(F2M_REVIVE_OFFERS,
(FrameworkID));
@@ -178,7 +170,8 @@ TUPLE(F2F_SLOT_OFFER_REPLY,
TUPLE(F2F_FRAMEWORK_MESSAGE,
(FrameworkMessage));
-
+TUPLE(F2F_TASK_RUNNING_STATUS,
+ ());
TUPLE(M2F_REGISTER_REPLY,
(FrameworkID));
Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131762&r1=1131761&r2=1131762&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun 5 05:26:48 2011
@@ -57,7 +57,42 @@ namespace nexus { namespace internal {
* any synchronization necessary is performed.
*/
-
+class ReliableReply : public Tuple<Process>
+{
+public:
+ ReliableReply(const PID &_p, const TaskID &_tid) :
+ parent(_p), tid(_tid), terminate(false) {}
+
+protected:
+ void operator () ()
+ {
+ link(parent);
+ while(!terminate) {
+
+ switch(receive(FT_TIMEOUT)) {
+ case F2F_TASK_RUNNING_STATUS: {
+ terminate = true;
+ break;
+ }
+
+ case PROCESS_TIMEOUT: {
+ terminate = true;
+ DLOG(INFO) << "FT: faking M2F_STATUS_UPDATE due to ReplyToOffer timeout for tid:" << tid;
+ send(parent,
+ pack<M2F_STATUS_UPDATE>(tid, TASK_LOST, ""));
+ break;
+ }
+
+ }
+ }
+ DLOG(INFO) << "FT: Exiting reliable reply for tid:" << tid;
+ }
+
+private:
+ bool terminate;
+ const PID parent;
+ const TaskID tid;
+};
class SchedulerProcess : public Tuple<Process>
{
@@ -81,25 +116,7 @@ private:
volatile bool terminate;
- class TimeoutListener;
- friend class TimeoutListener;
-
- class TimeoutListener : public FTCallback {
- public:
- TimeoutListener(SchedulerProcess *s, const vector<TaskDescription> t) : parent(s), tasks(t) {}
-
- virtual void timeout() {
- foreach (const TaskDescription &t, tasks) {
- DLOG(INFO) << "FT: faking M2F_STATUS_UPDATE due to timeout to server during ReplyToOffer";
- parent->send(parent->self(),
- pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
- }
- }
-
- private:
- SchedulerProcess *parent;
- vector<TaskDescription> tasks;
- };
+ unordered_map<TaskID, ReliableReply *> reliableReplies;
public:
SchedulerProcess(const string &_master,
@@ -246,20 +263,16 @@ protected:
// Remove the offer since we saved all the PIDs we might use.
savedOffers.erase(oid);
- // TODO(alig|benh): Walk through scenario if the master dies
- // after it sends out M2S_RUN_TASK messages?
-
if (isFT) {
- TimeoutListener *tListener = new TimeoutListener(this, tasks);
+ foreach(const TaskDescription &task, tasks) {
+ ReliableReply *rr = new ReliableReply(self(), task.taskId);
+ reliableReplies[task.taskId] = rr;
+ link(spawn(rr));
+ }
+ }
+
+ send(master, pack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params));
- string ftId = ftMsg->getNextId();
- DLOG(INFO) << "Sending reliably reply to slot offer for msg " << ftId;
- ftMsg->reliableSend(ftId,
- pack<F2M_FT_SLOT_OFFER_REPLY>(ftId, self(), fid, oid, tasks, params),
- tListener);
- } else {
- send(master, pack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params));
- }
break;
}
@@ -294,6 +307,11 @@ protected:
break;
DLOG(INFO) << "FT: Received message with id: " << ftId;
+ if (state == TASK_RUNNING) {
+ send(reliableReplies[tid]->getPID(), pack<F2F_TASK_RUNNING_STATUS>());
+ reliableReplies.erase(tid);
+ }
+
TaskStatus status(tid, state, data);
invoke(bind(&Scheduler::statusUpdate, sched, driver, ref(status)));
break;