You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:27:02 UTC

svn commit: r1131764 - /incubator/mesos/trunk/src/nexus_sched.cpp

Author: benh
Date: Sun Jun  5 05:27:02 2011
New Revision: 1131764

URL: http://svn.apache.org/viewvc?rev=1131764&view=rev
Log:
Cleaned up code for reliable replies to offers

Modified:
    incubator/mesos/trunk/src/nexus_sched.cpp

Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131764&r1=1131763&r2=1131764&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun  5 05:27:02 2011
@@ -57,10 +57,10 @@ namespace nexus { namespace internal {
  * any synchronization necessary is performed.
  */
 
-class ReliableReply : public Tuple<Process>
+class RbReply : public Tuple<Process>
 {    
 public:
-  ReliableReply(const PID &_p, const TaskID &_tid) : 
+  RbReply(const PID &_p, const TaskID &_tid) : 
     parent(_p), tid(_tid), terminate(false) {}
   
 protected:
@@ -69,7 +69,7 @@ protected:
     link(parent);
     while(!terminate) {
 
-      switch(receive(FT_TIMEOUT)) {
+      switch(receive(FT_TIMEOUT*3)) {
       case F2F_TASK_RUNNING_STATUS: {
         terminate = true;
         break;
@@ -116,7 +116,7 @@ private:
 
   volatile bool terminate;
 
-  unordered_map<TaskID, ReliableReply *> reliableReplies;
+  unordered_map<TaskID, RbReply *> rbReplies;
 
 public:
   SchedulerProcess(const string &_master,
@@ -265,8 +265,8 @@ protected:
 
         if (isFT) {
           foreach(const TaskDescription &task, tasks) {
-            ReliableReply *rr = new ReliableReply(self(), task.taskId);
-            reliableReplies[task.taskId] = rr;
+            RbReply *rr = new RbReply(self(), task.taskId);
+            rbReplies[task.taskId] = rr;
             link(spawn(rr));
           }
         }
@@ -308,8 +308,11 @@ protected:
         DLOG(INFO) << "FT: Received message with id: " << ftId;
 
         if (state == TASK_RUNNING) {
-          send(reliableReplies[tid]->getPID(), pack<F2F_TASK_RUNNING_STATUS>());
-          reliableReplies.erase(tid);
+          unordered_map <TaskID, RbReply *>::iterator it = rbReplies.find(tid);
+          if (it != rbReplies.end()) {
+            send(it->second->getPID(), pack<F2F_TASK_RUNNING_STATUS>());
+            rbReplies.erase(tid);
+          }
         }
 
         TaskStatus status(tid, state, data);
@@ -323,6 +326,12 @@ protected:
         string data;
         unpack<M2F_STATUS_UPDATE>(tid, state, data);
         TaskStatus status(tid, state, data);
+
+        unordered_map <TaskID, RbReply *>::iterator it = rbReplies.find(tid);
+        if (it != rbReplies.end() && it->second->getPID() == from()) { // clean rbReplies
+          rbReplies.erase(tid);
+        }
+
         invoke(bind(&Scheduler::statusUpdate, sched, driver, ref(status)));
         break;
       }