You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:28:00 UTC
svn commit: r1131599 - in /incubator/mesos/trunk/src: ft_messaging.cpp
ft_messaging.hpp master.cpp messages.hpp nexus_sched.cpp slave.cpp
third_party/libprocess/tuple-impl.hpp
Author: benh
Date: Sun Jun 5 03:28:00 2011
New Revision: 1131599
URL: http://svn.apache.org/viewvc?rev=1131599&view=rev
Log:
SendFrameworkMessage is now reliable from framework to executors and vice versa. Libprocess modified to make pack public.
Modified:
incubator/mesos/trunk/src/ft_messaging.cpp
incubator/mesos/trunk/src/ft_messaging.hpp
incubator/mesos/trunk/src/master.cpp
incubator/mesos/trunk/src/messages.hpp
incubator/mesos/trunk/src/nexus_sched.cpp
incubator/mesos/trunk/src/slave.cpp
incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp
Modified: incubator/mesos/trunk/src/ft_messaging.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.cpp?rev=1131599&r1=1131598&r2=1131599&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.cpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.cpp Sun Jun 5 03:28:00 2011
@@ -75,7 +75,7 @@ void FTMessaging::sendOutstanding() {
return;
}
- foreachpair( const string &ftId, struct StoredMsg &msg, outMsgs) {
+ foreachpair( const string &ftId, struct FTStoredMsg &msg, outMsgs) {
if (msg.count < FT_MAX_RESENDS) {
DLOG(INFO) << "FT: RE-sending " << msg.ftId << " attempt:" << msg.count;
Process::post(master, msg.id, msg.data.data(), msg.data.size());
@@ -117,6 +117,20 @@ bool FTMessaging::acceptMessage(string f
}
}
+bool FTMessaging::acceptMessageAck(string from, string ftId) {
+ bool res = acceptMessage(from, ftId);
+
+ if (!res)
+ LOG(WARNING) << "FT: asked called to ignore duplicate message " << ftId;
+
+ DLOG(INFO) << "FT: Received message with id: " << ftId << " sending FT_RELAY_ACK";
+
+ string msgStr = Tuple<EmptyClass>::tupleToString( Tuple<EmptyClass>::pack<FT_RELAY_ACK>(ftId, from) );
+ Process::post(master, FT_RELAY_ACK, msgStr.data(), msgStr.size());
+
+ return res;
+}
+
void FTMessaging::setMasterPid(const PID &mPid) {
master = mPid;
}
Modified: incubator/mesos/trunk/src/ft_messaging.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/ft_messaging.hpp?rev=1131599&r1=1131598&r2=1131599&view=diff
==============================================================================
--- incubator/mesos/trunk/src/ft_messaging.hpp (original)
+++ incubator/mesos/trunk/src/ft_messaging.hpp Sun Jun 5 03:28:00 2011
@@ -62,12 +62,16 @@ using boost::unordered_set;
class EmptyClass {
};
-struct StoredMsg {
+/**
+ * Used in FTMessaging to store unacked messages, their count, ftId, libprocess id.
+ * @see FTMessaging
+ */
+struct FTStoredMsg {
- StoredMsg(const string &_ftId, const string &_data, const MSGID &_id) :
+ FTStoredMsg(const string &_ftId, const string &_data, const MSGID &_id) :
ftId(_ftId), data(_data), id(_id), count(0) {}
- StoredMsg() : ftId(""), data(""), id(), count(0) {}
+ FTStoredMsg() : ftId(""), data(""), id(), count(0) {}
string ftId;
string data;
@@ -76,6 +80,10 @@ struct StoredMsg {
};
+/**
+ * Singleton class that provides functionality for reliably sending messages,
+ * resending them on timeout, acking received messages, and dropping duplicates.
+ */
class FTMessaging {
public:
/**
@@ -107,7 +115,7 @@ public:
{
DLOG(INFO) << "FT: sending " << ftId;
string msgStr = Tuple<EmptyClass>::tupleToString(msgTuple);
- StoredMsg sm(ftId, msgStr, ID);
+ FTStoredMsg sm(ftId, msgStr, ID);
outMsgs[ftId] = sm;
if (!master) {
DLOG(INFO) << "FT: Not RE-resending due to NULL master PID";
@@ -136,12 +144,22 @@ public:
bool acceptMessage(string from, string ftId);
/**
+ * Same as acceptMessage, but also sends an ACK back to the original sender if it returns true.
+ * @param from libprocess PID string representing the original sender of the message
+ * @param ftId the FT ID of the message
+ * @return true if message has not been received before and it is the next message expected to be received, false otherwise.
+ */
+ bool acceptMessageAck(string from, string ftId);
+
+ /**
* @return a new unique FT ID for a message to be sent
*/
string getNextId();
/**
* Sets the PID to the master (to be called when a new master comes up).
+ * Important invariant: needs to be called every time the master changes in slave/master/sched.
+ * @param mPid PID to the current master
*/
void setMasterPid(const PID &mPid);
@@ -149,7 +167,7 @@ private:
PID master;
- unordered_map<string, StoredMsg> outMsgs;
+ unordered_map<string, FTStoredMsg> outMsgs;
unordered_map<string, string> inMsgs;
Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131599&r1=1131598&r2=1131599&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun 5 03:28:00 2011
@@ -403,18 +403,21 @@ void Master::operator () ()
break;
}
- case F2M_FRAMEWORK_MESSAGE: {
+ case F2M_FT_FRAMEWORK_MESSAGE: {
FrameworkID fid;
FrameworkMessage message;
- unpack<F2M_FRAMEWORK_MESSAGE>(fid, message);
+ string ftId, senderStr;
+ unpack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, fid, message);
Framework *framework = lookupFramework(fid);
if (framework != NULL) {
Slave *slave = lookupSlave(message.slaveId);
if (slave != NULL) {
LOG(INFO) << "Sending framework message to " << slave;
- send(slave->pid, pack<M2S_FRAMEWORK_MESSAGE>(fid, message));
- }
- }
+ send(slave->pid, pack<M2S_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, fid, message));
+ } else
+ DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup framework id" << fid;
+ } else
+ DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup slave id" << message.slaveId;
break;
}
@@ -467,13 +470,12 @@ void Master::operator () ()
}
case S2M_FT_STATUS_UPDATE: {
- string ftId;
SlaveID sid;
FrameworkID fid;
TaskID tid;
TaskState state;
string data;
- string senderStr;
+ string ftId, senderStr;
unpack<S2M_FT_STATUS_UPDATE>(ftId, senderStr, sid, fid, tid, state, data);
DLOG(INFO) << "FT: prepare relay ftId:"<< ftId << " from: "<< senderStr;
@@ -485,19 +487,19 @@ void Master::operator () ()
send(framework->pid, pack<M2F_FT_STATUS_UPDATE>(ftId, senderStr, tid, state, data));
if (!ftMsg->acceptMessage(senderStr, ftId)) {
- LOG(WARNING) << "Locally ignoring duplicate message with id:" << ftId;
- } else {
- // Update the task state locally
- TaskInfo *task = slave->lookupTask(fid, tid);
- if (task != NULL) {
- LOG(INFO) << "Status update: " << task << " is in state " << state;
- task->state = state;
- // Remove the task if it finished or failed
- if (state == TASK_FINISHED || state == TASK_FAILED ||
- state == TASK_KILLED || state == TASK_LOST) {
- LOG(INFO) << "Removing " << task << " because it's done";
- removeTask(task, TRR_TASK_ENDED);
- }
+ LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << ftId;
+ break;
+ }
+ // Update the task state locally
+ TaskInfo *task = slave->lookupTask(fid, tid);
+ if (task != NULL) {
+ LOG(INFO) << "Status update: " << task << " is in state " << state;
+ task->state = state;
+ // Remove the task if it finished or failed
+ if (state == TASK_FINISHED || state == TASK_FAILED ||
+ state == TASK_KILLED || state == TASK_LOST) {
+ LOG(INFO) << "Removing " << task << " because it's done";
+ removeTask(task, TRR_TASK_ENDED);
}
}
} else
@@ -532,11 +534,33 @@ void Master::operator () ()
}
}
} else
- DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup slave id" << sid;
- }
+ DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup framework id" << fid;
+ } else
+ DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup slave id" << sid;
break;
}
+ case S2M_FT_FRAMEWORK_MESSAGE: {
+ SlaveID sid;
+ FrameworkID fid;
+ FrameworkMessage message;
+ string ftId, senderStr;
+ unpack<S2M_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, sid, fid, message);
+ Slave *slave = lookupSlave(sid);
+ if (slave != NULL) {
+ Framework *framework = lookupFramework(fid);
+ if (framework != NULL) {
+
+ send(framework->pid, pack<M2F_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, message));
+
+ } else
+ DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup framework id" << fid;
+ } else
+ DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup slave id" << sid;
+
+ break;
+ }
+
case S2M_FRAMEWORK_MESSAGE: {
SlaveID sid;
FrameworkID fid;
Modified: incubator/mesos/trunk/src/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.hpp?rev=1131599&r1=1131598&r2=1131599&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.hpp (original)
+++ incubator/mesos/trunk/src/messages.hpp Sun Jun 5 03:28:00 2011
@@ -26,6 +26,7 @@ enum MessageType {
F2M_REVIVE_OFFERS,
F2M_KILL_TASK,
F2M_FRAMEWORK_MESSAGE,
+ F2M_FT_FRAMEWORK_MESSAGE,
/* From master to framework. */
M2F_REGISTER_REPLY,
@@ -35,6 +36,7 @@ enum MessageType {
M2F_FT_STATUS_UPDATE,
M2F_LOST_SLAVE,
M2F_FRAMEWORK_MESSAGE,
+ M2F_FT_FRAMEWORK_MESSAGE,
M2F_ERROR,
/* Used for FT. */
@@ -48,6 +50,7 @@ enum MessageType {
S2M_STATUS_UPDATE,
S2M_FT_STATUS_UPDATE,
S2M_FRAMEWORK_MESSAGE,
+ S2M_FT_FRAMEWORK_MESSAGE,
S2M_LOST_EXECUTOR,
/* From slave heart to master. */
@@ -63,6 +66,7 @@ enum MessageType {
M2S_KILL_TASK,
M2S_KILL_FRAMEWORK,
M2S_FRAMEWORK_MESSAGE,
+ M2S_FT_FRAMEWORK_MESSAGE,
M2S_SHUTDOWN, // Used in unit tests to shut down cluster
/* From executor to slave. */
@@ -143,6 +147,12 @@ TUPLE(F2M_FRAMEWORK_MESSAGE,
(FrameworkID,
FrameworkMessage));
+TUPLE(F2M_FT_FRAMEWORK_MESSAGE,
+ (std::string, /* FT ID */
+ std::string, /* original sender */
+ FrameworkID,
+ FrameworkMessage));
+
TUPLE(M2F_REGISTER_REPLY,
(FrameworkID));
@@ -171,6 +181,11 @@ TUPLE(M2F_LOST_SLAVE,
TUPLE(M2F_FRAMEWORK_MESSAGE,
(FrameworkMessage));
+TUPLE(M2F_FT_FRAMEWORK_MESSAGE,
+ (std::string, /* FT ID */
+ std::string, /* original sender */
+ FrameworkMessage));
+
TUPLE(M2F_ERROR,
(int32_t /*code*/,
std::string /*msg*/));
@@ -224,6 +239,13 @@ TUPLE(S2M_FRAMEWORK_MESSAGE,
FrameworkID,
FrameworkMessage));
+TUPLE(S2M_FT_FRAMEWORK_MESSAGE,
+ (std::string, /* ftId */
+ std::string, /* sender PID */
+ SlaveID,
+ FrameworkID,
+ FrameworkMessage));
+
TUPLE(S2M_LOST_EXECUTOR,
(SlaveID,
FrameworkID,
@@ -262,6 +284,12 @@ TUPLE(M2S_FRAMEWORK_MESSAGE,
(FrameworkID,
FrameworkMessage));
+TUPLE(M2S_FT_FRAMEWORK_MESSAGE,
+ (std::string, /* FT ID */
+ std::string, /* original sender */
+ FrameworkID,
+ FrameworkMessage));
+
TUPLE(M2S_SHUTDOWN,
());
Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131599&r1=1131598&r2=1131599&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun 5 03:28:00 2011
@@ -156,6 +156,7 @@ protected:
}
link(master);
+ ftMsg->setMasterPid(master);
send(master, pack<F2M_REGISTER_FRAMEWORK>(frameworkName, user, execInfo));
while(true) {
@@ -197,19 +198,14 @@ protected:
}
case M2F_FT_STATUS_UPDATE: {
- string ftId;
- string origPid;
TaskID tid;
TaskState state;
string data;
+ string ftId, origPid;
unpack<M2F_FT_STATUS_UPDATE>(ftId, origPid, tid, state, data);
- if (!ftMsg->acceptMessage(origPid, ftId)) {
- LOG(WARNING) << "Dropping duplicate message with id:" << ftId;
+ if (!ftMsg->acceptMessageAck(origPid, ftId))
break;
- }
- DLOG(INFO) << "Received fault tolerant message with id: " << ftId;
- DLOG(INFO) << "Sending FT_RELAY_ACK for : " << ftId;
- send(from(), pack<FT_RELAY_ACK>(ftId, origPid));
+ DLOG(INFO) << "FT: Received message with id: " << ftId;
TaskStatus status(tid, state, data);
invoke(bind(&Scheduler::statusUpdate, sched, driver, ref(status)));
@@ -227,6 +223,20 @@ protected:
break;
}
+ case M2F_FT_FRAMEWORK_MESSAGE: {
+ FrameworkMessage msg;
+ string ftId, origPid;
+ unpack<M2F_FT_FRAMEWORK_MESSAGE>(ftId, origPid, msg);
+
+ if (!ftMsg->acceptMessageAck(origPid, ftId))
+ break;
+
+ DLOG(INFO) << "FT: Received message with id: " << ftId;
+
+ invoke(bind(&Scheduler::frameworkMessage, sched, driver, ref(msg)));
+ break;
+ }
+
case M2F_FRAMEWORK_MESSAGE: {
FrameworkMessage msg;
unpack<M2F_FRAMEWORK_MESSAGE>(msg);
@@ -269,6 +279,7 @@ protected:
LOG(INFO) << "Connecting to Nexus master at " << master;
link(master);
+ ftMsg->setMasterPid(master);
send(master, pack<F2M_REREGISTER_FRAMEWORK>(fid, frameworkName, user, execInfo));
break;
}
@@ -502,8 +513,12 @@ void NexusSchedulerDriver::sendFramework
return;
}
- process->send(process->master,
- process->pack<F2M_FRAMEWORK_MESSAGE>(process->fid, message));
+ if (process->isFT) {
+ string ftId = process->ftMsg->getNextId();
+ process->ftMsg->reliableSend( ftId, process->pack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, process->self(), process->fid, message));
+ } else
+ process->send(process->master,
+ process->pack<F2M_FRAMEWORK_MESSAGE>(process->fid, message));
}
Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131599&r1=1131598&r2=1131599&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun 5 03:28:00 2011
@@ -254,6 +254,24 @@ void Slave::operator () ()
break;
}
+ case M2S_FT_FRAMEWORK_MESSAGE: {
+ string ftId, origPid;
+ unpack<M2S_FT_FRAMEWORK_MESSAGE>(ftId, origPid, fid, message);
+
+ if (!ftMsg->acceptMessageAck(origPid, ftId))
+ break;
+
+ DLOG(INFO) << "FT: Received message with id: " << ftId;
+
+ if (Executor *ex = getExecutor(fid)) {
+ send(ex->pid, pack<S2E_FRAMEWORK_MESSAGE>(message));
+ }
+ // TODO(matei): If executor is not started, queue framework message?
+ // (It's probably okay to just drop it since frameworks can have
+ // the executor send a message to the master to say when it's ready.)
+ break;
+ }
+
case M2S_FRAMEWORK_MESSAGE: {
unpack<M2S_FRAMEWORK_MESSAGE>(fid, message);
if (Executor *ex = getExecutor(fid)) {
@@ -313,9 +331,11 @@ void Slave::operator () ()
}
}
// Pass on the update to the master
- string ftId = ftMsg->getNextId();
- ftMsg->reliableSend(ftId, pack<S2M_FT_STATUS_UPDATE>(ftId, self(), id, fid, tid, taskState, data));
- // send(master, pack<S2M_STATUS_UPDATE>(id, fid, tid, taskState, data));
+ if (isFT) {
+ string ftId = ftMsg->getNextId();
+ ftMsg->reliableSend(ftId, pack<S2M_FT_STATUS_UPDATE>(ftId, self(), id, fid, tid, taskState, data));
+ } else
+ send(master, pack<S2M_STATUS_UPDATE>(id, fid, tid, taskState, data));
break;
}
@@ -323,7 +343,11 @@ void Slave::operator () ()
unpack<E2S_FRAMEWORK_MESSAGE>(fid, message);
// Set slave ID in case framework omitted it
message.slaveId = this->id;
- send(master, pack<S2M_FRAMEWORK_MESSAGE>(id, fid, message));
+ if (isFT) {
+ string ftId = ftMsg->getNextId();
+ ftMsg->reliableSend(ftId, pack<S2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), id, fid, message));
+ } else
+ send(master, pack<S2M_FRAMEWORK_MESSAGE>(id, fid, message));
break;
}
Modified: incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp?rev=1131599&r1=1131598&r2=1131599&view=diff
==============================================================================
--- incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp (original)
+++ incubator/mesos/trunk/src/third_party/libprocess/tuple-impl.hpp Sun Jun 5 03:28:00 2011
@@ -51,7 +51,7 @@ at(const tuple<ID> &r)
template <typename P>
class Tuple : public P
{
-protected:
+public:
template <MSGID ID>
static tuple<ID> pack()
{
@@ -163,6 +163,8 @@ protected:
return tuple<ID>(::boost::make_tuple(t0, t1, t2, t3, t4, t5, t6, t7, t8, t9));
}
+protected:
+
template <MSGID ID>
void unpack(typename field<0, ID>::type &t0)
{