You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 10:57:09 UTC
svn commit: r1132194 - in /incubator/mesos/trunk/src: messaging/messages.hpp
sched/sched.cpp
Author: benh
Date: Sun Jun 5 08:57:08 2011
New Revision: 1132194
URL: http://svn.apache.org/viewvc?rev=1132194&view=rev
Log:
Updated scheduler component to use asynchronous function dispatch for local "messages".
Modified:
incubator/mesos/trunk/src/messaging/messages.hpp
incubator/mesos/trunk/src/sched/sched.cpp
Modified: incubator/mesos/trunk/src/messaging/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.hpp?rev=1132194&r1=1132193&r2=1132194&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.hpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.hpp Sun Jun 5 08:57:08 2011
@@ -40,8 +40,6 @@ enum MessageType {
F2M_KILL_TASK,
F2M_FRAMEWORK_MESSAGE,
- F2F_SLOT_OFFER_REPLY,
- F2F_FRAMEWORK_MESSAGE,
F2F_TASK_RUNNING_STATUS,
/* From master to framework. */
@@ -236,14 +234,6 @@ TUPLE(F2M_FRAMEWORK_MESSAGE,
(FrameworkID,
FrameworkMessage));
-TUPLE(F2F_SLOT_OFFER_REPLY,
- (OfferID,
- std::vector<TaskDescription>,
- Params));
-
-TUPLE(F2F_FRAMEWORK_MESSAGE,
- (FrameworkMessage));
-
TUPLE(F2F_TASK_RUNNING_STATUS,
());
Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1132194&r1=1132193&r2=1132194&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Sun Jun 5 08:57:08 2011
@@ -105,25 +105,6 @@ private:
class SchedulerProcess : public MesosProcess
{
public:
- friend class mesos::MesosSchedulerDriver;
-
-private:
- MesosSchedulerDriver* driver;
- Scheduler* sched;
- FrameworkID fid;
- string frameworkName;
- ExecutorInfo execInfo;
- int32_t generation;
- PID master;
-
- volatile bool terminate;
-
- unordered_map<OfferID, unordered_map<SlaveID, PID> > savedOffers;
- unordered_map<SlaveID, PID> savedSlavePids;
-
- unordered_map<TaskID, RbReply *> rbReplies;
-
-public:
SchedulerProcess(MesosSchedulerDriver* _driver,
Scheduler* _sched,
FrameworkID _fid,
@@ -171,7 +152,7 @@ protected:
// send a message rather than have a timeout (see the comment
// above for why sending a message will still require us to use
// the terminate flag).
- switch (receive(2)) {
+ switch (serve(2)) {
case NEW_MASTER_DETECTED: {
string masterSeq;
@@ -228,47 +209,6 @@ protected:
break;
}
- case F2F_SLOT_OFFER_REPLY: {
- OfferID oid;
- vector<TaskDescription> tasks;
- Params params;
- tie(oid, tasks, params) = unpack<F2F_SLOT_OFFER_REPLY>(body());
-
- // Keep only the slave PIDs where we run tasks so we can send
- // framework messages directly.
- foreach(const TaskDescription &task, tasks) {
- savedSlavePids[task.slaveId] = savedOffers[oid][task.slaveId];
- }
-
- // Remove the offer since we saved all the PIDs we might use.
- savedOffers.erase(oid);
-
- foreach(const TaskDescription &task, tasks) {
- RbReply *rr = new RbReply(self(), task.taskId);
- rbReplies[task.taskId] = rr;
- // TODO(benh): Link?
- spawn(rr);
- }
-
- send(master, pack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params));
- break;
- }
-
- case F2F_FRAMEWORK_MESSAGE: {
- FrameworkMessage msg;
- tie(msg) = unpack<F2F_FRAMEWORK_MESSAGE>(body());
- VLOG(1) << "Asked to send framework message to slave " << msg.slaveId;
- if (savedSlavePids.count(msg.slaveId) > 0 &&
- savedSlavePids[msg.slaveId] != PID()) {
- VLOG(1) << "Saved slave PID is " << savedSlavePids[msg.slaveId];
- send(savedSlavePids[msg.slaveId], pack<M2S_FRAMEWORK_MESSAGE>(fid, msg));
- } else {
- VLOG(1) << "No PID is saved for that slave; sending through master";
- send(master, pack<F2M_FRAMEWORK_MESSAGE>(fid, msg));
- }
- break;
- }
-
case M2F_RESCIND_OFFER: {
OfferID oid;
tie(oid) = unpack<M2F_RESCIND_OFFER>(body());
@@ -376,6 +316,77 @@ protected:
}
}
}
+
+ void stop()
+ {
+ send(master, pack<F2M_UNREGISTER_FRAMEWORK>(fid));
+ }
+
+ void killTask(TaskID tid)
+ {
+ send(master, pack<F2M_KILL_TASK>(fid, tid));
+ }
+
+ void replyToOffer(OfferID offerId,
+ const vector<TaskDescription>& tasks,
+ const map<std::string, std::string>& params)
+ {
+ // Keep only the slave PIDs where we run tasks so we can send
+ // framework messages directly.
+ foreach(const TaskDescription &task, tasks) {
+ savedSlavePids[task.slaveId] = savedOffers[offerId][task.slaveId];
+ }
+
+ // Remove the offer since we saved all the PIDs we might use.
+ savedOffers.erase(offerId);
+
+ foreach(const TaskDescription& task, tasks) {
+ RbReply *rr = new RbReply(self(), task.taskId);
+ rbReplies[task.taskId] = rr;
+ // TODO(benh): Link?
+ spawn(rr);
+ }
+
+ send(master,
+ pack<F2M_SLOT_OFFER_REPLY>(fid, offerId, tasks, Params(params)));
+ }
+
+ void reviveOffers()
+ {
+ send(master, pack<F2M_REVIVE_OFFERS>(fid));
+ }
+
+ void sendFrameworkMessage(const FrameworkMessage& message)
+ {
+ VLOG(1) << "Asked to send framework message to slave " << message.slaveId;
+ if (savedSlavePids.count(message.slaveId) > 0 &&
+ savedSlavePids[message.slaveId] != PID()) {
+ VLOG(1) << "Saved slave PID is " << savedSlavePids[message.slaveId];
+ send(savedSlavePids[message.slaveId],
+ pack<M2S_FRAMEWORK_MESSAGE>(fid, message));
+ } else {
+ VLOG(1) << "No PID is saved for that slave; sending through master";
+ send(master, pack<F2M_FRAMEWORK_MESSAGE>(fid, message));
+ }
+ }
+
+private:
+ friend class mesos::MesosSchedulerDriver;
+
+ MesosSchedulerDriver* driver;
+ Scheduler* sched;
+ FrameworkID fid;
+ string frameworkName;
+ ExecutorInfo execInfo;
+ int32_t generation;
+ PID master;
+
+ volatile bool terminate;
+
+ unordered_map<OfferID, unordered_map<SlaveID, PID> > savedOffers;
+ unordered_map<SlaveID, PID> savedSlavePids;
+
+ unordered_map<TaskID, RbReply *> rbReplies;
};
}} /* namespace mesos { namespace internal { */
@@ -581,9 +592,7 @@ int MesosSchedulerDriver::stop()
return -1;
}
- // TODO(benh): Do a Process::post instead?
- process->send(process->master,
- pack<F2M_UNREGISTER_FRAMEWORK>(process->fid));
+ Process::dispatch(process, &SchedulerProcess::stop);
process->terminate = true;
@@ -621,10 +630,7 @@ int MesosSchedulerDriver::killTask(TaskI
return -1;
}
- // TODO(benh): Do a Process::post instead?
-
- process->send(process->master,
- pack<F2M_KILL_TASK>(process->fid, tid));
+ Process::dispatch(process, &SchedulerProcess::killTask, tid);
return 0;
}
@@ -641,10 +647,8 @@ int MesosSchedulerDriver::replyToOffer(O
return -1;
}
- // TODO(benh): Do a Process::post instead?
-
- process->send(process->self(),
- pack<F2F_SLOT_OFFER_REPLY>(offerId, tasks, Params(params)));
+ Process::dispatch(process, &SchedulerProcess::replyToOffer,
+ offerId, tasks, params);
return 0;
}
@@ -659,16 +663,13 @@ int MesosSchedulerDriver::reviveOffers()
return -1;
}
- // TODO(benh): Do a Process::post instead?
-
- process->send(process->master,
- pack<F2M_REVIVE_OFFERS>(process->fid));
+ Process::dispatch(process, &SchedulerProcess::reviveOffers);
return 0;
}
-int MesosSchedulerDriver::sendFrameworkMessage(const FrameworkMessage &message)
+int MesosSchedulerDriver::sendFrameworkMessage(const FrameworkMessage& message)
{
Lock lock(&mutex);
@@ -677,9 +678,7 @@ int MesosSchedulerDriver::sendFrameworkM
return -1;
}
- // TODO(benh): Do a Process::post instead?
-
- process->send(process->self(), pack<F2F_FRAMEWORK_MESSAGE>(message));
+ Process::dispatch(process, &SchedulerProcess::sendFrameworkMessage, message);
return 0;
}