You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 10:57:41 UTC
svn commit: r1132198 - in /incubator/mesos/trunk: src/master/master.cpp
src/messaging/messages.hpp src/sched/sched.cpp src/slave/slave.cpp
src/tests/master_test.cpp third_party/libprocess/reliable.cpp
third_party/libprocess/reliable.hpp
Author: benh
Date: Sun Jun 5 08:57:40 2011
New Revision: 1132198
URL: http://svn.apache.org/viewvc?rev=1132198&view=rev
Log:
Added improved support for forwarding reliable messages that allows changing the body of the forwarded message and used the new support to eliminate redundant *_FT_STATUS_UPDATE messages.
Modified:
incubator/mesos/trunk/src/master/master.cpp
incubator/mesos/trunk/src/messaging/messages.hpp
incubator/mesos/trunk/src/sched/sched.cpp
incubator/mesos/trunk/src/slave/slave.cpp
incubator/mesos/trunk/src/tests/master_test.cpp
incubator/mesos/trunk/third_party/libprocess/reliable.cpp
incubator/mesos/trunk/third_party/libprocess/reliable.hpp
Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1132198&r1=1132197&r2=1132198&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Sun Jun 5 08:57:40 2011
@@ -541,25 +541,20 @@ void Master::operator () ()
break;
}
- case S2M_FT_STATUS_UPDATE: {
+ case S2M_STATUS_UPDATE: {
SlaveID sid;
FrameworkID fid;
TaskID tid;
TaskState state;
string data;
- tie(sid, fid, tid, state, data) = unpack<S2M_FT_STATUS_UPDATE>(body());
+ tie(sid, fid, tid, state, data) = unpack<S2M_STATUS_UPDATE>(body());
- VLOG(1) << "FT: prepare relay seq:"<< seq() << " from: "<< from();
if (Slave *slave = lookupSlave(sid)) {
if (Framework *framework = lookupFramework(fid)) {
- // Pass on the status update to the framework.
- // TODO(benh): Do we not want to forward the
- // S2M_FT_STATUS_UPDATE message? This seems a little tricky
- // because we really wanted to send the M2F_FT_STATUS_UPDATE
- // message.
- forward(framework->pid);
+ // Pass on the (transformed) status update to the framework.
+ forward(framework->pid, pack<M2F_STATUS_UPDATE>(tid, state, data));
if (duplicate()) {
- LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << seq();
+ LOG(WARNING) << "Locally ignoring duplicate message with id:" << seq();
break;
}
// Update the task state locally.
@@ -575,48 +570,14 @@ void Master::operator () ()
}
}
} else {
- LOG(ERROR) << "S2M_FT_STATUS_UPDATE error: couldn't lookup "
+ LOG(ERROR) << "S2M_STATUS_UPDATE error: couldn't lookup "
<< "framework id " << fid;
}
} else {
- LOG(ERROR) << "S2M_FT_STATUS_UPDATE error: couldn't lookup slave id "
- << sid;
- }
- break;
- }
-
- case S2M_STATUS_UPDATE: {
- SlaveID sid;
- FrameworkID fid;
- TaskID tid;
- TaskState state;
- string data;
- tie(sid, fid, tid, state, data) = unpack<S2M_STATUS_UPDATE>(body());
- if (Slave *slave = lookupSlave(sid)) {
- if (Framework *framework = lookupFramework(fid)) {
- // Pass on the status update to the framework
- send(framework->pid, pack<M2F_STATUS_UPDATE>(tid, state, data));
- // Update the task state locally
- Task *task = slave->lookupTask(fid, tid);
- if (task != NULL) {
- LOG(INFO) << "Status update: " << task << " is in state " << state;
- task->state = state;
- // Remove the task if it finished or failed
- if (state == TASK_FINISHED || state == TASK_FAILED ||
- state == TASK_KILLED || state == TASK_LOST) {
- LOG(INFO) << "Removing " << task << " because it's done";
- removeTask(task, TRR_TASK_ENDED);
- }
- }
- } else {
- LOG(ERROR) << "S2M_STATUS_UPDATE error: couldn't lookup framework id "
- << fid;
- }
- } else {
LOG(ERROR) << "S2M_STATUS_UPDATE error: couldn't lookup slave id "
<< sid;
}
- break;
+ break;
}
case S2M_FRAMEWORK_MESSAGE: {
Modified: incubator/mesos/trunk/src/messaging/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.hpp?rev=1132198&r1=1132197&r2=1132198&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.hpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.hpp Sun Jun 5 08:57:40 2011
@@ -47,7 +47,6 @@ enum MessageType {
M2F_SLOT_OFFER,
M2F_RESCIND_OFFER,
M2F_STATUS_UPDATE,
- M2F_FT_STATUS_UPDATE,
M2F_LOST_SLAVE,
M2F_FRAMEWORK_MESSAGE,
M2F_ERROR,
@@ -57,7 +56,6 @@ enum MessageType {
S2M_REREGISTER_SLAVE,
S2M_UNREGISTER_SLAVE,
S2M_STATUS_UPDATE,
- S2M_FT_STATUS_UPDATE,
S2M_FRAMEWORK_MESSAGE,
S2M_LOST_EXECUTOR,
@@ -168,6 +166,13 @@ protected:
}
template <MSGID ID>
+ bool forward(const PID &to, const tuple<ID> &t)
+ {
+ const std::string &data = MESOS_MESSAGING_VERSION + "|" + std::string(t);
+ ReliableProcess::forward(to, ID, data.data(), data.size());
+ }
+
+ template <MSGID ID>
int rsend(const PID &to, const tuple<ID> &t)
{
const std::string &data = MESOS_MESSAGING_VERSION + "|" + std::string(t);
@@ -265,11 +270,6 @@ TUPLE(M2F_STATUS_UPDATE,
TaskState,
std::string));
-TUPLE(M2F_FT_STATUS_UPDATE,
- (TaskID,
- TaskState,
- std::string));
-
TUPLE(M2F_LOST_SLAVE,
(SlaveID));
@@ -303,13 +303,6 @@ TUPLE(S2M_STATUS_UPDATE,
TaskState,
std::string));
-TUPLE(S2M_FT_STATUS_UPDATE,
- (SlaveID,
- FrameworkID,
- TaskID,
- TaskState,
- std::string));
-
TUPLE(S2M_FRAMEWORK_MESSAGE,
(SlaveID,
FrameworkID,
Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1132198&r1=1132197&r2=1132198&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Sun Jun 5 08:57:40 2011
@@ -225,20 +225,12 @@ protected:
break;
}
- // TODO(benh): Fix forwarding issues.
-// case M2F_FT_STATUS_UPDATE: {
-// TaskID tid;
-// TaskState state;
-// string data;
-// unpack<M2F_FT_STATUS_UPDATE>(tid, state, data);
- case S2M_FT_STATUS_UPDATE: {
- SlaveID sid;
- FrameworkID fid;
- TaskID tid;
- TaskState state;
- string data;
+ case M2F_STATUS_UPDATE: {
+ TaskID tid;
+ TaskState state;
+ string data;
- tie(sid, fid, tid, state, data) = unpack<S2M_FT_STATUS_UPDATE>(body());
+ tie(tid, state, data) = unpack<M2F_STATUS_UPDATE>(body());
if (duplicate()) {
VLOG(1) << "Received a duplicate status update for tid " << tid
@@ -262,26 +254,6 @@ protected:
break;
}
- case M2F_STATUS_UPDATE: {
- TaskID tid;
- TaskState state;
- string data;
- tie(tid, state, data) = unpack<M2F_STATUS_UPDATE>(body());
-
- // Stop any status update timers we might have had running.
- if (timers.count(tid) > 0) {
- StatusUpdateTimer* timer = timers[tid];
- timers.erase(tid);
- send(timer->self(), MESOS_MSGID);
- wait(timer->self());
- delete timer;
- }
-
- TaskStatus status(tid, state, data);
- invoke(bind(&Scheduler::statusUpdate, sched, driver, ref(status)));
- break;
- }
-
case M2F_FRAMEWORK_MESSAGE: {
FrameworkMessage msg;
tie(msg) = unpack<M2F_FRAMEWORK_MESSAGE>(body());
Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132198&r1=1132197&r2=1132198&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun 5 08:57:40 2011
@@ -369,8 +369,8 @@ void Slave::operator () ()
// Reliably send message and save sequence number for
// canceling later.
int seq = rsend(master, framework->pid,
- pack<S2M_FT_STATUS_UPDATE>(id, fid, tid,
- taskState, data));
+ pack<S2M_STATUS_UPDATE>(id, fid, tid,
+ taskState, data));
seqs[fid].insert(seq);
} else {
LOG(WARNING) << "Got status update for UNKNOWN task "
Modified: incubator/mesos/trunk/src/tests/master_test.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_test.cpp?rev=1132198&r1=1132197&r2=1132198&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_test.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_test.cpp Sun Jun 5 08:57:40 2011
@@ -623,7 +623,7 @@ TEST(MasterTest, SchedulerFailoverStatus
EXPECT_CALL(sched1, error(&driver1, _, "Framework failover"))
.Times(1);
- EXPECT_MSG(filter, Eq(S2M_FT_STATUS_UPDATE), _, Ne(master))
+ EXPECT_MSG(filter, Eq(M2F_STATUS_UPDATE), _, Ne(master))
.WillOnce(DoAll(Trigger(&statusUpdateMsg), Return(true)))
.RetiresOnSaturation();
Modified: incubator/mesos/trunk/third_party/libprocess/reliable.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/reliable.cpp?rev=1132198&r1=1132197&r2=1132198&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/reliable.cpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/reliable.cpp Sun Jun 5 08:57:40 2011
@@ -154,10 +154,10 @@ void ReliableProcess::ack()
}
-bool ReliableProcess::forward(const PID &via)
+bool ReliableProcess::forward(const PID &to)
{
if (current != NULL) {
- send(via, RELIABLE_MSG, (char *) current,
+ send(to, RELIABLE_MSG, (char *) current,
sizeof(struct rmsg) + current->msg.len);
return true;
}
@@ -166,6 +166,37 @@ bool ReliableProcess::forward(const PID
}
+bool ReliableProcess::forward(const PID &to, MSGID id, const char *data, size_t length)
+{
+ if (current != NULL) {
+ struct rmsg *rmsg = (struct rmsg *) malloc(sizeof(struct rmsg) + length);
+
+ rmsg->seq = current->seq;
+
+ rmsg->msg.from.pipe = current->msg.from.pipe;
+ rmsg->msg.from.ip = current->msg.from.ip;
+ rmsg->msg.from.port = current->msg.from.port;
+ rmsg->msg.to.pipe = to.pipe;
+ rmsg->msg.to.ip = to.ip;
+ rmsg->msg.to.port = to.port;
+ rmsg->msg.id = id;
+ rmsg->msg.len = length;
+
+ if (length > 0)
+ memcpy((char *) rmsg + sizeof(struct rmsg), data, length);
+
+ send(to, RELIABLE_MSG, (char *) rmsg,
+ sizeof(struct rmsg) + rmsg->msg.len);
+
+ free(rmsg);
+
+ return true;
+ }
+
+ return false;
+}
+
+
int ReliableProcess::rsend(const PID &to, MSGID id, const char *data, size_t length)
{
// Allocate/Initialize outgoing message.
Modified: incubator/mesos/trunk/third_party/libprocess/reliable.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/third_party/libprocess/reliable.hpp?rev=1132198&r1=1132197&r2=1132198&view=diff
==============================================================================
--- incubator/mesos/trunk/third_party/libprocess/reliable.hpp (original)
+++ incubator/mesos/trunk/third_party/libprocess/reliable.hpp Sun Jun 5 08:57:40 2011
@@ -62,29 +62,22 @@ protected:
/**
* Forward current message (provided it is _reliable_).
- * @param via hop (or possibly destination)
+ * @param to hop (or possibly destination)
* @return false if the current message is not _reliable_, true
* otherwise.
*/
- virtual bool forward(const PID &via);
+ virtual bool forward(const PID &to);
/**
- * Sends a _reliable_ message to PID.
- * @param to destination
- * @param id message id
- * @return sequence number of message
- */
- virtual int rsend(const PID &to, MSGID id);
-
- /**
- * Sends a _reliable_ message via another PID (meant to be
- * forwarded).
- * @param via hop
- * @param to destination
- * @param id message id
- * @return sequence number of message
+ * Transform current message with specified id and data and forward
+ * it (provided it is _reliable_). This effectively allows an
+ * intermediate receiver that shouldn't be responsible for the
+ * acknowledgement to change the body of the message as it needs.
+ * @param to hop (or possibly destination)
+ * @return false if the current message is not _reliable_, true
+ * otherwise.
*/
- virtual int rsend(const PID &via, const PID &to, MSGID id);
+ virtual bool forward(const PID &to, MSGID id, const char *data, size_t length);
/**
* Sends a _reliable_ message with data to PID.
@@ -94,7 +87,7 @@ protected:
* @param length payload length
* @return sequence number of message
*/
- virtual int rsend(const PID &to, MSGID id, const char *data, size_t length);
+ virtual int rsend(const PID &to, MSGID id, const char *data = NULL, size_t length = 0);
/**
* Sends a _reliable_ message with data via another process (meant
@@ -106,14 +99,10 @@ protected:
* @param length payload length
* @return sequence number of message
*/
- virtual int rsend(const PID &via, const PID &to, MSGID id, const char *data, size_t length);
+ virtual int rsend(const PID &via, const PID &to, MSGID id, const char *data = NULL, size_t length = 0);
-
- /* Blocks for message indefinitely. */
- virtual MSGID receive();
-
- /* Blocks for message at most specified seconds. */
- virtual MSGID receive(double secs);
+ /* Blocks for message at most specified seconds (0 implies forever). */
+ virtual MSGID receive(double secs = 0);
/**
* Redirect unacknolwedged messages to be sent to a different PID.
@@ -137,22 +126,4 @@ private:
};
-inline int ReliableProcess::rsend(const PID &to, MSGID id)
-{
- return rsend(to, id, NULL, 0);
-}
-
-
-inline int ReliableProcess::rsend(const PID &via, const PID &to, MSGID id)
-{
- return rsend(via, to, id, NULL, 0);
-}
-
-
-inline MSGID ReliableProcess::receive()
-{
- return receive(0);
-}
-
-
#endif /* __RELIABLE_HPP__ */