You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:28:37 UTC
svn commit: r1131604 - in /incubator/mesos/trunk/src: ft_messaging.cpp
ft_messaging.hpp master.cpp memhog.cpp messages.hpp nexus_sched.cpp
Author: benh
Date: Sun Jun 5 03:28:37 2011
New Revision: 1131604
URL: http://svn.apache.org/viewvc?rev=1131604&view=rev
Log:
ReplyToOffer now will timeout and fake TASK_LOST messages to itself in the Scheduler/Framework
Modified:
incubator/mesos/trunk/src/ft_messaging.cpp
incubator/mesos/trunk/src/ft_messaging.hpp
incubator/mesos/trunk/src/master.cpp
incubator/mesos/trunk/src/memhog.cpp
incubator/mesos/trunk/src/messages.hpp
incubator/mesos/trunk/src/nexus_sched.cpp
Modified: incubator/mesos/trunk/src/ft_messaging.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.cpp?rev=1131604&r1=1131603&r2=1131604&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.cpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.cpp Sun Jun 5 03:28:37 2011
@@ -76,7 +76,12 @@ void FTMessaging::sendOutstanding() {
}
foreachpair( const string &ftId, struct FTStoredMsg &msg, outMsgs) {
- if (msg.count < FT_MAX_RESENDS) {
+ if (msg.callback != NULL) {
+ DLOG(INFO) << "FT: calling timeout listener";
+ msg.callback->timeout();
+ delete msg.callback; // ugly and sad. shared_ptr would have been better
+ outMsgs.erase(ftId);
+ } else if (msg.count < FT_MAX_RESENDS) {
DLOG(INFO) << "FT: RE-sending " << msg.ftId << " attempt:" << msg.count;
Process::post(master, msg.id, msg.data.data(), msg.data.size());
msg.count++;
Modified: incubator/mesos/trunk/src/ft_messaging.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.hpp?rev=1131604&r1=1131603&r2=1131604&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.hpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.hpp Sun Jun 5 03:28:37 2011
@@ -63,26 +63,37 @@ using boost::unordered_set;
class EmptyClass {
};
+
+/**
+ * Interface used to signal that a message has timed out.
+ */
+class FTCallback {
+public:
+ /**
+ * Called from sendOutstanding() if a message has a callback object and it has timed out.
+ */
+ virtual void timeout() = 0;
+};
+
/**
* Used in FTMessaging to store unacked messages, their count, ftId, libprocess id.
* @see FTMessaging
*/
struct FTStoredMsg {
- FTStoredMsg(const string &_ftId, const string &_data, const MSGID &_id) :
- ftId(_ftId), data(_data), id(_id), count(1), ts(time(0)) {}
+ FTStoredMsg(const string &_ftId, const string &_data, const MSGID &_id, FTCallback *_cb=NULL) :
+ ftId(_ftId), data(_data), id(_id), count(1), ts(time(0)), callback(_cb) {}
- FTStoredMsg() : ftId(""), data(""), id(), count(1), ts(time(0)) {}
+ FTStoredMsg(bool _cb=false) : ftId(""), data(""), id(), count(1), ts(time(0)), callback(NULL) {}
string ftId;
string data;
MSGID id;
long count;
time_t ts; // not currently used
+ FTCallback *callback;
};
-
-
/**
* Singleton class that provides functionality for reliably sending messages,
* resending them on timeout, acking received messages, and dropping duplicates.
@@ -113,13 +124,14 @@ public:
* @see getNextId().
* @param ftId string representing the unique FT id of the message
* @param msgTuple libprocess tuple<ID>
+ * @param FTCallback if not null, then FTCallback will be called by sendOutstanding()
*/
- template<MSGID ID> void reliableSend(const string &ftId, const tuple<ID> &msgTuple)
+ template<MSGID ID> void reliableSend(const string &ftId, const tuple<ID> &msgTuple, FTCallback *callback=NULL)
{
DLOG(INFO) << "FT: sending " << ftId;
string msgStr = Tuple<EmptyClass>::tupleToString(msgTuple);
- FTStoredMsg sm(ftId, msgStr, ID);
- outMsgs[ftId] = sm;
+
+ outMsgs[ftId] = FTStoredMsg(ftId, msgStr, ID, callback);
if (!master) {
DLOG(INFO) << "FT: Not RE-resending due to NULL master PID";
return;
Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131604&r1=1131603&r2=1131604&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun 5 03:28:37 2011
@@ -353,6 +353,35 @@ void Master::operator () ()
break;
}
+ case F2M_FT_SLOT_OFFER_REPLY: {
+ FrameworkID fid;
+ OfferID oid;
+ vector<TaskDescription> tasks;
+ Params params;
+ string ftId, senderStr;
+ unpack<F2M_FT_SLOT_OFFER_REPLY>(ftId, senderStr, fid, oid, tasks, params);
+ if (!ftMsg->acceptMessageAck(senderStr, ftId)) {
+ LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << ftId;
+ break;
+ }
+ Framework *framework = lookupFramework(fid);
+ if (framework != NULL) {
+ SlotOffer *offer = lookupSlotOffer(oid);
+ if (offer != NULL) {
+ processOfferReply(offer, tasks, params);
+ } else {
+ // The slot offer is gone, meaning that we rescinded it or that
+ // the slave was lost; immediately report any tasks in it as lost
+ foreach (TaskDescription &t, tasks) {
+ send(framework->pid,
+ pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
+ }
+ }
+ } else
+ DLOG(INFO) << "F2M_FT_SLOT_OFFER_REPLY error: couldn't lookup framework id" << fid;
+ break;
+ }
+
case F2M_SLOT_OFFER_REPLY: {
FrameworkID fid;
OfferID oid;
Modified: incubator/mesos/trunk/src/memhog.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/memhog.cpp?rev=1131604&r1=1131603&r2=1131604&view=diff
==============================================================================
--- incubator/mesos/trunk/src/memhog.cpp (original)
+++ incubator/mesos/trunk/src/memhog.cpp Sun Jun 5 03:28:37 2011
@@ -77,6 +77,10 @@ public:
virtual void statusUpdate(SchedulerDriver* d, const TaskStatus& status) {
cout << endl << "Task " << status.taskId << " is in state " << status.state << endl;
+ if (status.state == TASK_LOST)
+ {
+ cout << endl << "Task " << status.taskId << " lost. Not doing anything about it." << endl;
+ }
if (status.state == TASK_FINISHED)
tasksFinished++;
if (tasksFinished == totalTasks)
Modified: incubator/mesos/trunk/src/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.hpp?rev=1131604&r1=1131603&r2=1131604&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.hpp (original)
+++ incubator/mesos/trunk/src/messages.hpp Sun Jun 5 03:28:37 2011
@@ -23,6 +23,7 @@ enum MessageType {
F2M_REREGISTER_FRAMEWORK,
F2M_UNREGISTER_FRAMEWORK,
F2M_SLOT_OFFER_REPLY,
+ F2M_FT_SLOT_OFFER_REPLY,
F2M_REVIVE_OFFERS,
F2M_KILL_TASK,
F2M_FRAMEWORK_MESSAGE,
@@ -136,6 +137,14 @@ TUPLE(F2M_SLOT_OFFER_REPLY,
std::vector<TaskDescription>,
Params));
+TUPLE(F2M_FT_SLOT_OFFER_REPLY,
+ (std::string, /* FT ID */
+ std::string, /* original sender */
+ FrameworkID,
+ OfferID,
+ std::vector<TaskDescription>,
+ Params));
+
TUPLE(F2M_REVIVE_OFFERS,
(FrameworkID));
Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131604&r1=1131603&r2=1131604&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun 5 03:28:37 2011
@@ -1,3 +1,6 @@
+
+#define FT_TIMEOUT 10
+
#include <dlfcn.h>
#include <errno.h>
#include <pwd.h>
@@ -56,6 +59,8 @@ namespace nexus { namespace internal {
* any synchronization necessary is performed.
*/
+
+
class SchedulerProcess : public Tuple<Process>
{
public:
@@ -100,6 +105,26 @@ private:
PID parentPID;
} schedLeaderListener;
+
+ class TimeoutListener;
+ friend class TimeoutListener;
+
+ class TimeoutListener : public FTCallback {
+ public:
+ TimeoutListener(SchedulerProcess *s, vector<TaskDescription> t) : parent(s), tasks(t) {}
+
+ virtual void timeout() {
+ foreach (TaskDescription &t, tasks) {
+ DLOG(INFO) << "FT: faking M2F_STATUS_UPDATE due to timeout to server during ReplyToOffer";
+ parent->send( parent->self(),
+ pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
+ }
+ }
+
+ private:
+ SchedulerProcess *parent;
+ vector<TaskDescription> tasks;
+ };
public:
SchedulerProcess(const PID &_master,
@@ -175,7 +200,7 @@ protected:
if (terminate)
return;
- switch(receive()) {
+ switch(receive(FT_TIMEOUT)) {
case M2F_REGISTER_REPLY: {
unpack<M2F_REGISTER_REPLY>(fid);
invoke(bind(&Scheduler::registered, sched, driver, fid));
@@ -214,6 +239,7 @@ protected:
case M2F_STATUS_UPDATE: {
+ DLOG(INFO) << "Got M2F_STATUS_UPDATE";
TaskID tid;
TaskState state;
string data;
@@ -283,6 +309,10 @@ protected:
send(master, pack<F2M_REREGISTER_FRAMEWORK>(fid, frameworkName, user, execInfo));
break;
}
+ case PROCESS_TIMEOUT: {
+ ftMsg->sendOutstanding();
+ break;
+ }
default: {
ostringstream oss;
oss << "SchedulerProcess received unknown message " << msgid()
@@ -480,6 +510,20 @@ void NexusSchedulerDriver::replyToOffer(
// TODO(benh): Do a Process::post instead?
+ if (process->isFT) {
+ SchedulerProcess::TimeoutListener *tListener =
+ new SchedulerProcess::TimeoutListener(process, tasks);
+
+ string ftId = process->ftMsg->getNextId();
+ process->ftMsg->reliableSend( ftId,
+ process->pack<F2M_FT_SLOT_OFFER_REPLY>(ftId,
+ process->self(),
+ process->fid,
+ offerId,
+ tasks,
+ Params(params)),
+ tListener);
+ } else
process->send(process->master,
process->pack<F2M_SLOT_OFFER_REPLY>(process->fid,
offerId,