You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:30:01 UTC

svn commit: r1131614 - in /incubator/mesos/trunk: include/nexus.hpp src/master.cpp src/messages.cpp src/messages.hpp src/nexus_sched.cpp src/slave.cpp src/slave.hpp

Author: benh
Date: Sun Jun  5 03:30:00 2011
New Revision: 1131614

URL: http://svn.apache.org/viewvc?rev=1131614&view=rev
Log:
SHOULD MAYBE REMOVE THIS COMMIT. It makes FrameworkMessage go directly between Slaves and Frameworks without going through master.
The change required saving Offers (and PIDs of slaves) in SchedulerProcess. When a ReplyOffer comes from the Framework
the relevant slave PIDs are saved in a map and the original Offer is deleted. This way a Scheduler can directly
lookup the Slave PID and send FW msgs. Similarly, the Framework struct at the Slave was augmented with PIDs which
are sent by a master when M2S_RUN_TASK is sent. Hence a slave can directly send a FW msg to the scheduler.

Modified:
    incubator/mesos/trunk/include/nexus.hpp
    incubator/mesos/trunk/src/master.cpp
    incubator/mesos/trunk/src/messages.cpp
    incubator/mesos/trunk/src/messages.hpp
    incubator/mesos/trunk/src/nexus_sched.cpp
    incubator/mesos/trunk/src/slave.cpp
    incubator/mesos/trunk/src/slave.hpp

Modified: incubator/mesos/trunk/include/nexus.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/nexus.hpp?rev=1131614&r1=1131613&r2=1131614&view=diff
==============================================================================
--- incubator/mesos/trunk/include/nexus.hpp (original)
+++ incubator/mesos/trunk/include/nexus.hpp Sun Jun  5 03:30:00 2011
@@ -3,7 +3,7 @@
 
 #include <map>
 #include <string>
-
+#include <process.hpp>
 #include <nexus_types.hpp>
 
 namespace nexus {
@@ -59,12 +59,14 @@ struct SlaveOffer
 
   SlaveOffer(SlaveID _slaveId,
              const std::string& _host,
-             const string_map& _params)
-    : slaveId(_slaveId), host(_host), params(_params) {}
+             const string_map& _params,
+             const PID& _slavePid)
+    : slaveId(_slaveId), host(_host), params(_params), slavePid(_slavePid) {}
 
   SlaveID slaveId;
   std::string host;
   string_map params;
+  PID slavePid;
 };
 
 

Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131614&r1=1131613&r2=1131614&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun  5 03:30:00 2011
@@ -747,7 +747,7 @@ OfferID Master::makeOffer(Framework *fra
     Params params;
     params.set("cpus", r.resources.cpus);
     params.set("mem", r.resources.mem);
-    SlaveOffer offer(r.slave->id, r.slave->hostname, params.getMap());
+    SlaveOffer offer(r.slave->id, r.slave->hostname, params.getMap(), r.slave->pid);
     offers.push_back(offer);
   }
   send(framework->pid, pack<M2F_SLOT_OFFER>(oid, offers));
@@ -852,7 +852,7 @@ void Master::launchTask(Framework *f, co
   allocator->taskAdded(task);
   send(slave->pid, pack<M2S_RUN_TASK>(
         f->id, t.taskId, f->name, f->user, f->executorInfo,
-        t.name, t.arg, t.params));
+        t.name, t.arg, t.params, (string)f->pid));
 }
 
 

Modified: incubator/mesos/trunk/src/messages.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.cpp?rev=1131614&r1=1131613&r2=1131614&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.cpp (original)
+++ incubator/mesos/trunk/src/messages.cpp Sun Jun  5 03:30:00 2011
@@ -26,6 +26,7 @@ void operator & (serializer& s, const Sl
   s & offer.slaveId;
   s & offer.host;
   s & offer.params;
+  s & offer.slavePid;
 }
 
 
@@ -34,6 +35,7 @@ void operator & (deserializer& s, SlaveO
   s & offer.slaveId;
   s & offer.host;
   s & offer.params;
+  s & offer.slavePid;
 }
 
 

Modified: incubator/mesos/trunk/src/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.hpp?rev=1131614&r1=1131613&r2=1131614&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.hpp (original)
+++ incubator/mesos/trunk/src/messages.hpp Sun Jun  5 03:30:00 2011
@@ -28,6 +28,9 @@ enum MessageType {
   F2M_KILL_TASK,
   F2M_FRAMEWORK_MESSAGE,
   F2M_FT_FRAMEWORK_MESSAGE,
+
+  F2F_SLOT_OFFER_REPLY,
+  F2F_FRAMEWORK_MESSAGE,
   
   /* From master to framework. */
   M2F_REGISTER_REPLY,
@@ -162,6 +165,17 @@ TUPLE(F2M_FT_FRAMEWORK_MESSAGE,
        FrameworkID,
        FrameworkMessage));
 
+
+TUPLE(F2F_SLOT_OFFER_REPLY,
+      (OfferID,
+       std::vector<TaskDescription>,
+       Params));
+
+TUPLE(F2F_FRAMEWORK_MESSAGE,
+      (FrameworkMessage));
+
+
+
 TUPLE(M2F_REGISTER_REPLY,
       (FrameworkID));
 
@@ -273,7 +287,8 @@ TUPLE(M2S_RUN_TASK,
        ExecutorInfo,
        std::string /*taskName*/,
        std::string /*taskArgs*/,
-       Params));
+       Params,
+       std::string /*framework PID*/));
 
 TUPLE(M2S_KILL_TASK,
       (FrameworkID,

Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131614&r1=1131613&r2=1131614&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun  5 03:30:00 2011
@@ -78,6 +78,10 @@ private:
   LeaderDetector *leaderDetector;
   FTMessaging *ftMsg;
 
+  typedef unordered_map< SlaveID, PID > SidToPidMap;
+  unordered_map< OfferID, SidToPidMap > savedOffers;
+  SidToPidMap sidToPidMap;
+
   volatile bool terminate;
 
   class SchedLeaderListener;
@@ -212,13 +216,64 @@ protected:
         OfferID oid;
         vector<SlaveOffer> offs;
         unpack<M2F_SLOT_OFFER>(oid, offs);
+        
+        savedOffers[ oid ] = SidToPidMap();
+        SidToPidMap &tmpMap = savedOffers[ oid ];
+        foreach(const SlaveOffer &offer, offs) {
+          tmpMap[ offer.slaveId ] = offer.slavePid;
+        }
+
         invoke(bind(&Scheduler::resourceOffer, sched, driver, oid, ref(offs)));
         break;
       }
 
+      case F2F_SLOT_OFFER_REPLY: {
+        OfferID oid;
+        vector<TaskDescription> tasks;
+        Params params;
+        vector<SlaveOffer> offs;
+        unpack<F2F_SLOT_OFFER_REPLY>(oid, tasks, params);
+
+        foreach(const TaskDescription &task, tasks) {
+          sidToPidMap[ task.slaveId ] = savedOffers[ oid ][ task.slaveId ];
+
+        }
+        savedOffers.erase( oid );
+
+        if (isFT) {
+          TimeoutListener *tListener = 
+            new TimeoutListener(this, tasks);
+
+          string ftId = ftMsg->getNextId();
+          DLOG(INFO) << "Sending reliably reply to slot offer for msg " << ftId;
+          ftMsg->reliableSend( ftId,
+                               pack<F2M_FT_SLOT_OFFER_REPLY>(ftId, self(), fid, oid, tasks, params),
+                               tListener);
+        } else
+          send(master, pack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params));
+
+        break;
+      }
+
+      case F2F_FRAMEWORK_MESSAGE: {
+        FrameworkMessage msg;
+        unpack<F2F_FRAMEWORK_MESSAGE>(msg);
+
+        /*
+        if (isFT) {
+          string ftId = ftMsg->getNextId();
+          ftMsg->reliableSend( ftId, pack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), fid, msg));
+        } else
+          send(master, pack<F2M_FRAMEWORK_MESSAGE>(fid, msg));
+        */
+        send( sidToPidMap[ msg.slaveId ], pack<M2S_FRAMEWORK_MESSAGE>(fid, msg));
+        break;
+      }
+
       case M2F_RESCIND_OFFER: {
         OfferID oid;
         unpack<M2F_RESCIND_OFFER>(oid);
+        savedOffers.erase(oid);
         invoke(bind(&Scheduler::offerRescinded, sched, driver, oid));
         break;
       }
@@ -240,7 +295,6 @@ protected:
 
 
       case M2F_STATUS_UPDATE: {
-        DLOG(INFO) << "Got M2F_STATUS_UPDATE";
         TaskID tid;
         TaskState state;
         string data;
@@ -274,6 +328,7 @@ protected:
       case M2F_LOST_SLAVE: {
         SlaveID sid;
         unpack<M2F_LOST_SLAVE>(sid);
+        sidToPidMap.erase(sid);
         invoke(bind(&Scheduler::slaveLost, sched, driver, sid));
         break;
       }
@@ -536,27 +591,8 @@ void NexusSchedulerDriver::replyToOffer(
   }
 
   // TODO(benh): Do a Process::post instead?
-
-  if (process->isFT) {
-    SchedulerProcess::TimeoutListener *tListener = 
-      new SchedulerProcess::TimeoutListener(process, tasks);
-
-    string ftId = process->ftMsg->getNextId();
-    DLOG(INFO) << "Sending reliably reply to slot offer for msg " << ftId;
-    process->ftMsg->reliableSend( ftId,
-                                  process->pack<F2M_FT_SLOT_OFFER_REPLY>(ftId,
-                                                                         process->self(),
-                                                                         process->fid,
-                                                                         offerId,
-                                                                         tasks,
-                                                                         Params(params)),
-                                  tListener);
-  } else
-  process->send(process->master,
-                process->pack<F2M_SLOT_OFFER_REPLY>(process->fid,
-                                                    offerId,
-                                                    tasks,
-                                                    Params(params)));
+  
+  process->send( process->self(), process->pack<F2F_SLOT_OFFER_REPLY>(offerId, tasks, Params(params)));
 }
 
 
@@ -585,12 +621,7 @@ void NexusSchedulerDriver::sendFramework
     return;
   }
 
-  if (process->isFT) {
-    string ftId = process->ftMsg->getNextId();
-    process->ftMsg->reliableSend( ftId, process->pack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, process->self(), process->fid, message));
-  } else
-    process->send(process->master,
-                  process->pack<F2M_FRAMEWORK_MESSAGE>(process->fid, message));
+  process->send( process->self(), process->pack<F2F_FRAMEWORK_MESSAGE>(message) );
 }
 
 

Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131614&r1=1131613&r2=1131614&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun  5 03:30:00 2011
@@ -209,10 +209,10 @@ void Slave::operator () ()
       }
       
       case M2S_RUN_TASK: {
-        string fwName, user, taskName, taskArg;
+        string fwName, user, taskName, taskArg, fwPidStr;
         ExecutorInfo execInfo;
         unpack<M2S_RUN_TASK>(fid, tid, fwName, user, execInfo,
-            taskName, taskArg, params);
+                             taskName, taskArg, params, fwPidStr);
         LOG(INFO) << "Got assigned task " << fid << ":" << tid;
         Resources res;
         res.cpus = params.getInt32("cpus", -1);
@@ -220,7 +220,10 @@ void Slave::operator () ()
         Framework *framework = getFramework(fid);
         if (framework == NULL) {
           // Framework not yet created on this node - create it
-          framework = new Framework(fid, fwName, user, execInfo);
+          PID fwPid;
+          istringstream ss(fwPidStr);
+          ss >> fwPid;
+          framework = new Framework(fid, fwName, user, execInfo, fwPid);
           frameworks[fid] = framework;
           isolationModule->frameworkAdded(framework);
           isolationModule->startExecutor(framework);
@@ -344,11 +347,14 @@ void Slave::operator () ()
         unpack<E2S_FRAMEWORK_MESSAGE>(fid, message);
         // Set slave ID in case framework omitted it
         message.slaveId = this->id;
+        /*
         if (isFT) {
           string ftId = ftMsg->getNextId();
           ftMsg->reliableSend(ftId, pack<S2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), id, fid, message));
         } else
           send(master, pack<S2M_FRAMEWORK_MESSAGE>(id, fid, message));
+        */
+        send(getFramework(fid)->fwPid, pack<M2F_FRAMEWORK_MESSAGE>(message));
         break;
       }
 

Modified: incubator/mesos/trunk/src/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.hpp?rev=1131614&r1=1131613&r2=1131614&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave.hpp Sun Jun  5 03:30:00 2011
@@ -83,14 +83,15 @@ struct Framework
   list<TaskDescription *> queuedTasks; // Holds tasks until executor starts
   unordered_map<TaskID, TaskInfo *> tasks;
   Resources resources;
+  PID fwPid;
 
   // Information about the status of the executor for this framework, set by
   // the isolation module. For example, this might include a PID, a VM ID, etc.
   string executorStatus;
   
   Framework(FrameworkID _id, const string& _name, const string& _user,
-      const ExecutorInfo& _executorInfo)
-    : id(_id), name(_name), user(_user), executorInfo(_executorInfo) {}
+            const ExecutorInfo& _executorInfo, const PID& _fwPid)
+    : id(_id), name(_name), user(_user), executorInfo(_executorInfo), fwPid(_fwPid) {}
 
   ~Framework()
   {