You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:27:02 UTC
svn commit: r1131764 - /incubator/mesos/trunk/src/nexus_sched.cpp
Author: benh
Date: Sun Jun 5 05:27:02 2011
New Revision: 1131764
URL: http://svn.apache.org/viewvc?rev=1131764&view=rev
Log:
Cleaned up code for reliable replies to offers
Modified:
incubator/mesos/trunk/src/nexus_sched.cpp
Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131764&r1=1131763&r2=1131764&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun 5 05:27:02 2011
@@ -57,10 +57,10 @@ namespace nexus { namespace internal {
* any synchronization necessary is performed.
*/
-class ReliableReply : public Tuple<Process>
+class RbReply : public Tuple<Process>
{
public:
- ReliableReply(const PID &_p, const TaskID &_tid) :
+ RbReply(const PID &_p, const TaskID &_tid) :
parent(_p), tid(_tid), terminate(false) {}
protected:
@@ -69,7 +69,7 @@ protected:
link(parent);
while(!terminate) {
- switch(receive(FT_TIMEOUT)) {
+ switch(receive(FT_TIMEOUT*3)) {
case F2F_TASK_RUNNING_STATUS: {
terminate = true;
break;
@@ -116,7 +116,7 @@ private:
volatile bool terminate;
- unordered_map<TaskID, ReliableReply *> reliableReplies;
+ unordered_map<TaskID, RbReply *> rbReplies;
public:
SchedulerProcess(const string &_master,
@@ -265,8 +265,8 @@ protected:
if (isFT) {
foreach(const TaskDescription &task, tasks) {
- ReliableReply *rr = new ReliableReply(self(), task.taskId);
- reliableReplies[task.taskId] = rr;
+ RbReply *rr = new RbReply(self(), task.taskId);
+ rbReplies[task.taskId] = rr;
link(spawn(rr));
}
}
@@ -308,8 +308,11 @@ protected:
DLOG(INFO) << "FT: Received message with id: " << ftId;
if (state == TASK_RUNNING) {
- send(reliableReplies[tid]->getPID(), pack<F2F_TASK_RUNNING_STATUS>());
- reliableReplies.erase(tid);
+ unordered_map <TaskID, RbReply *>::iterator it = rbReplies.find(tid);
+ if (it != rbReplies.end()) {
+ send(it->second->getPID(), pack<F2F_TASK_RUNNING_STATUS>());
+ rbReplies.erase(tid);
+ }
}
TaskStatus status(tid, state, data);
@@ -323,6 +326,12 @@ protected:
string data;
unpack<M2F_STATUS_UPDATE>(tid, state, data);
TaskStatus status(tid, state, data);
+
+ unordered_map <TaskID, RbReply *>::iterator it = rbReplies.find(tid);
+ if (it != rbReplies.end() && it->second->getPID() == from()) { // clean rbReplies
+ rbReplies.erase(tid);
+ }
+
invoke(bind(&Scheduler::statusUpdate, sched, driver, ref(status)));
break;
}