You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 10:57:09 UTC

svn commit: r1132194 - in /incubator/mesos/trunk/src: messaging/messages.hpp sched/sched.cpp

Author: benh
Date: Sun Jun  5 08:57:08 2011
New Revision: 1132194

URL: http://svn.apache.org/viewvc?rev=1132194&view=rev
Log:
Updated scheduler component to use asynchronous function dispatch for local "messages".

Modified:
    incubator/mesos/trunk/src/messaging/messages.hpp
    incubator/mesos/trunk/src/sched/sched.cpp

Modified: incubator/mesos/trunk/src/messaging/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.hpp?rev=1132194&r1=1132193&r2=1132194&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.hpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.hpp Sun Jun  5 08:57:08 2011
@@ -40,8 +40,6 @@ enum MessageType {
   F2M_KILL_TASK,
   F2M_FRAMEWORK_MESSAGE,
 
-  F2F_SLOT_OFFER_REPLY,
-  F2F_FRAMEWORK_MESSAGE,
   F2F_TASK_RUNNING_STATUS,
   
   /* From master to framework. */
@@ -236,14 +234,6 @@ TUPLE(F2M_FRAMEWORK_MESSAGE,
       (FrameworkID,
        FrameworkMessage));
 
-TUPLE(F2F_SLOT_OFFER_REPLY,
-      (OfferID,
-       std::vector<TaskDescription>,
-       Params));
-
-TUPLE(F2F_FRAMEWORK_MESSAGE,
-      (FrameworkMessage));
-
 TUPLE(F2F_TASK_RUNNING_STATUS,
       ());
 

Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1132194&r1=1132193&r2=1132194&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Sun Jun  5 08:57:08 2011
@@ -105,25 +105,6 @@ private:
 class SchedulerProcess : public MesosProcess
 {
 public:
-  friend class mesos::MesosSchedulerDriver;
-
-private:
-  MesosSchedulerDriver* driver;
-  Scheduler* sched;
-  FrameworkID fid;
-  string frameworkName;
-  ExecutorInfo execInfo;
-  int32_t generation;
-  PID master;
-
-  volatile bool terminate;
-
-  unordered_map<OfferID, unordered_map<SlaveID, PID> > savedOffers;
-  unordered_map<SlaveID, PID> savedSlavePids;
-
-  unordered_map<TaskID, RbReply *> rbReplies;
-
-public:
   SchedulerProcess(MesosSchedulerDriver* _driver,
                    Scheduler* _sched,
 		   FrameworkID _fid,
@@ -171,7 +152,7 @@ protected:
       // send a message rather than have a timeout (see the comment
       // above for why sending a message will still require us to use
       // the terminate flag).
-      switch (receive(2)) {
+      switch (serve(2)) {
 
       case NEW_MASTER_DETECTED: {
 	string masterSeq;
@@ -228,47 +209,6 @@ protected:
         break;
       }
 
-      case F2F_SLOT_OFFER_REPLY: {
-        OfferID oid;
-        vector<TaskDescription> tasks;
-        Params params;
-        tie(oid, tasks, params) = unpack<F2F_SLOT_OFFER_REPLY>(body());
-
-	// Keep only the slave PIDs where we run tasks so we can send
-	// framework messages directly.
-        foreach(const TaskDescription &task, tasks) {
-          savedSlavePids[task.slaveId] = savedOffers[oid][task.slaveId];
-        }
-
-	// Remove the offer since we saved all the PIDs we might use.
-        savedOffers.erase(oid);
-
-	foreach(const TaskDescription &task, tasks) {
-	  RbReply *rr = new RbReply(self(), task.taskId);
-	  rbReplies[task.taskId] = rr;
-	  // TODO(benh): Link?
-	  spawn(rr);
-	}
-        
-        send(master, pack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params));
-        break;
-      }
-
-      case F2F_FRAMEWORK_MESSAGE: {
-        FrameworkMessage msg;
-        tie(msg) = unpack<F2F_FRAMEWORK_MESSAGE>(body());
-        VLOG(1) << "Asked to send framework message to slave " << msg.slaveId;
-        if (savedSlavePids.count(msg.slaveId) > 0 &&
-            savedSlavePids[msg.slaveId] != PID()) {
-          VLOG(1) << "Saved slave PID is " << savedSlavePids[msg.slaveId];
-          send(savedSlavePids[msg.slaveId], pack<M2S_FRAMEWORK_MESSAGE>(fid, msg));
-        } else {
-          VLOG(1) << "No PID is saved for that slave; sending through master";
-          send(master, pack<F2M_FRAMEWORK_MESSAGE>(fid, msg));
-        }
-        break;
-      }
-
       case M2F_RESCIND_OFFER: {
         OfferID oid;
         tie(oid) = unpack<M2F_RESCIND_OFFER>(body());
@@ -376,6 +316,77 @@ protected:
       }
     }
   }
+
+  void stop()
+  {
+    send(master, pack<F2M_UNREGISTER_FRAMEWORK>(fid));
+  }
+
+  void killTask(TaskID tid)
+  {
+    send(master, pack<F2M_KILL_TASK>(fid, tid));
+  }
+
+  void replyToOffer(OfferID offerId,
+                    const vector<TaskDescription>& tasks,
+                    const map<std::string, std::string>& params)
+  {
+    // Keep only the slave PIDs where we run tasks so we can send
+    // framework messages directly.
+    foreach(const TaskDescription &task, tasks) {
+      savedSlavePids[task.slaveId] = savedOffers[offerId][task.slaveId];
+    }
+
+    // Remove the offer since we saved all the PIDs we might use.
+    savedOffers.erase(offerId);
+
+    foreach(const TaskDescription& task, tasks) {
+      RbReply *rr = new RbReply(self(), task.taskId);
+      rbReplies[task.taskId] = rr;
+      // TODO(benh): Link?
+      spawn(rr);
+    }
+        
+    send(master,
+         pack<F2M_SLOT_OFFER_REPLY>(fid, offerId, tasks, Params(params)));
+  }
+
+  void reviveOffers()
+  {
+    send(master, pack<F2M_REVIVE_OFFERS>(fid));
+  }
+
+  void sendFrameworkMessage(const FrameworkMessage& message)
+  {
+    VLOG(1) << "Asked to send framework message to slave " << message.slaveId;
+    if (savedSlavePids.count(message.slaveId) > 0 &&
+        savedSlavePids[message.slaveId] != PID()) {
+      VLOG(1) << "Saved slave PID is " << savedSlavePids[message.slaveId];
+      send(savedSlavePids[message.slaveId],
+           pack<M2S_FRAMEWORK_MESSAGE>(fid, message));
+    } else {
+      VLOG(1) << "No PID is saved for that slave; sending through master";
+      send(master, pack<F2M_FRAMEWORK_MESSAGE>(fid, message));
+    }
+  }
+
+private:
+  friend class mesos::MesosSchedulerDriver;
+
+  MesosSchedulerDriver* driver;
+  Scheduler* sched;
+  FrameworkID fid;
+  string frameworkName;
+  ExecutorInfo execInfo;
+  int32_t generation;
+  PID master;
+
+  volatile bool terminate;
+
+  unordered_map<OfferID, unordered_map<SlaveID, PID> > savedOffers;
+  unordered_map<SlaveID, PID> savedSlavePids;
+
+  unordered_map<TaskID, RbReply *> rbReplies;
 };
 
 }} /* namespace mesos { namespace internal { */
@@ -581,9 +592,7 @@ int MesosSchedulerDriver::stop()
     return -1;
   }
 
-  // TODO(benh): Do a Process::post instead?
-  process->send(process->master,
-                pack<F2M_UNREGISTER_FRAMEWORK>(process->fid));
+  Process::dispatch(process, &SchedulerProcess::stop);
 
   process->terminate = true;
 
@@ -621,10 +630,7 @@ int MesosSchedulerDriver::killTask(TaskI
     return -1;
   }
 
-  // TODO(benh): Do a Process::post instead?
-
-  process->send(process->master,
-                pack<F2M_KILL_TASK>(process->fid, tid));
+  Process::dispatch(process, &SchedulerProcess::killTask, tid);
 
   return 0;
 }
@@ -641,10 +647,8 @@ int MesosSchedulerDriver::replyToOffer(O
     return -1;
   }
 
-  // TODO(benh): Do a Process::post instead?
-  
-  process->send(process->self(),
-                pack<F2F_SLOT_OFFER_REPLY>(offerId, tasks, Params(params)));
+  Process::dispatch(process, &SchedulerProcess::replyToOffer,
+                    offerId, tasks, params);
 
   return 0;
 }
@@ -659,16 +663,13 @@ int MesosSchedulerDriver::reviveOffers()
     return -1;
   }
 
-  // TODO(benh): Do a Process::post instead?
-
-  process->send(process->master,
-                pack<F2M_REVIVE_OFFERS>(process->fid));
+  Process::dispatch(process, &SchedulerProcess::reviveOffers);
 
   return 0;
 }
 
 
-int MesosSchedulerDriver::sendFrameworkMessage(const FrameworkMessage &message)
+int MesosSchedulerDriver::sendFrameworkMessage(const FrameworkMessage& message)
 {
   Lock lock(&mutex);
 
@@ -677,9 +678,7 @@ int MesosSchedulerDriver::sendFrameworkM
     return -1;
   }
 
-  // TODO(benh): Do a Process::post instead?
-
-  process->send(process->self(), pack<F2F_FRAMEWORK_MESSAGE>(message));
+  Process::dispatch(process, &SchedulerProcess::sendFrameworkMessage, message);
 
   return 0;
 }