You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:30:01 UTC
svn commit: r1131614 - in /incubator/mesos/trunk: include/nexus.hpp
src/master.cpp src/messages.cpp src/messages.hpp src/nexus_sched.cpp
src/slave.cpp src/slave.hpp
Author: benh
Date: Sun Jun 5 03:30:00 2011
New Revision: 1131614
URL: http://svn.apache.org/viewvc?rev=1131614&view=rev
Log:
SHOULD MAYBE REMOVE THIS COMMIT. It makes FrameworkMessage go directly between Slaves and Frameworks without going through master.
The change required saving Offers (and PIDs of slaves) in SchedulerProcess. When a ReplyOffer comes from the Framework
the relevant slave PIDs are saved in a map and the original Offer is deleted. This way a Scheduler can directly
lookup the Slave PID and send FW msgs. Similarly, the Framework struct at the Slave was augmented with PIDs which
are sent by a master when M2S_RUN_TASK is sent. Hence a slave can directly send a FW msg to the scheduler.
Modified:
incubator/mesos/trunk/include/nexus.hpp
incubator/mesos/trunk/src/master.cpp
incubator/mesos/trunk/src/messages.cpp
incubator/mesos/trunk/src/messages.hpp
incubator/mesos/trunk/src/nexus_sched.cpp
incubator/mesos/trunk/src/slave.cpp
incubator/mesos/trunk/src/slave.hpp
Modified: incubator/mesos/trunk/include/nexus.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/include/nexus.hpp?rev=1131614&r1=1131613&r2=1131614&view=diff
==============================================================================
--- incubator/mesos/trunk/include/nexus.hpp (original)
+++ incubator/mesos/trunk/include/nexus.hpp Sun Jun 5 03:30:00 2011
@@ -3,7 +3,7 @@
#include <map>
#include <string>
-
+#include <process.hpp>
#include <nexus_types.hpp>
namespace nexus {
@@ -59,12 +59,14 @@ struct SlaveOffer
SlaveOffer(SlaveID _slaveId,
const std::string& _host,
- const string_map& _params)
- : slaveId(_slaveId), host(_host), params(_params) {}
+ const string_map& _params,
+ const PID& _slavePid)
+ : slaveId(_slaveId), host(_host), params(_params), slavePid(_slavePid) {}
SlaveID slaveId;
std::string host;
string_map params;
+ PID slavePid;
};
Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131614&r1=1131613&r2=1131614&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun 5 03:30:00 2011
@@ -747,7 +747,7 @@ OfferID Master::makeOffer(Framework *fra
Params params;
params.set("cpus", r.resources.cpus);
params.set("mem", r.resources.mem);
- SlaveOffer offer(r.slave->id, r.slave->hostname, params.getMap());
+ SlaveOffer offer(r.slave->id, r.slave->hostname, params.getMap(), r.slave->pid);
offers.push_back(offer);
}
send(framework->pid, pack<M2F_SLOT_OFFER>(oid, offers));
@@ -852,7 +852,7 @@ void Master::launchTask(Framework *f, co
allocator->taskAdded(task);
send(slave->pid, pack<M2S_RUN_TASK>(
f->id, t.taskId, f->name, f->user, f->executorInfo,
- t.name, t.arg, t.params));
+ t.name, t.arg, t.params, (string)f->pid));
}
Modified: incubator/mesos/trunk/src/messages.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.cpp?rev=1131614&r1=1131613&r2=1131614&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.cpp (original)
+++ incubator/mesos/trunk/src/messages.cpp Sun Jun 5 03:30:00 2011
@@ -26,6 +26,7 @@ void operator & (serializer& s, const Sl
s & offer.slaveId;
s & offer.host;
s & offer.params;
+ s & offer.slavePid;
}
@@ -34,6 +35,7 @@ void operator & (deserializer& s, SlaveO
s & offer.slaveId;
s & offer.host;
s & offer.params;
+ s & offer.slavePid;
}
Modified: incubator/mesos/trunk/src/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.hpp?rev=1131614&r1=1131613&r2=1131614&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.hpp (original)
+++ incubator/mesos/trunk/src/messages.hpp Sun Jun 5 03:30:00 2011
@@ -28,6 +28,9 @@ enum MessageType {
F2M_KILL_TASK,
F2M_FRAMEWORK_MESSAGE,
F2M_FT_FRAMEWORK_MESSAGE,
+
+ F2F_SLOT_OFFER_REPLY,
+ F2F_FRAMEWORK_MESSAGE,
/* From master to framework. */
M2F_REGISTER_REPLY,
@@ -162,6 +165,17 @@ TUPLE(F2M_FT_FRAMEWORK_MESSAGE,
FrameworkID,
FrameworkMessage));
+
+TUPLE(F2F_SLOT_OFFER_REPLY,
+ (OfferID,
+ std::vector<TaskDescription>,
+ Params));
+
+TUPLE(F2F_FRAMEWORK_MESSAGE,
+ (FrameworkMessage));
+
+
+
TUPLE(M2F_REGISTER_REPLY,
(FrameworkID));
@@ -273,7 +287,8 @@ TUPLE(M2S_RUN_TASK,
ExecutorInfo,
std::string /*taskName*/,
std::string /*taskArgs*/,
- Params));
+ Params,
+ std::string /*framework PID*/));
TUPLE(M2S_KILL_TASK,
(FrameworkID,
Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131614&r1=1131613&r2=1131614&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun 5 03:30:00 2011
@@ -78,6 +78,10 @@ private:
LeaderDetector *leaderDetector;
FTMessaging *ftMsg;
+ typedef unordered_map< SlaveID, PID > SidToPidMap;
+ unordered_map< OfferID, SidToPidMap > savedOffers;
+ SidToPidMap sidToPidMap;
+
volatile bool terminate;
class SchedLeaderListener;
@@ -212,13 +216,64 @@ protected:
OfferID oid;
vector<SlaveOffer> offs;
unpack<M2F_SLOT_OFFER>(oid, offs);
+
+ savedOffers[ oid ] = SidToPidMap();
+ SidToPidMap &tmpMap = savedOffers[ oid ];
+ foreach(const SlaveOffer &offer, offs) {
+ tmpMap[ offer.slaveId ] = offer.slavePid;
+ }
+
invoke(bind(&Scheduler::resourceOffer, sched, driver, oid, ref(offs)));
break;
}
+ case F2F_SLOT_OFFER_REPLY: {
+ OfferID oid;
+ vector<TaskDescription> tasks;
+ Params params;
+ vector<SlaveOffer> offs;
+ unpack<F2F_SLOT_OFFER_REPLY>(oid, tasks, params);
+
+ foreach(const TaskDescription &task, tasks) {
+ sidToPidMap[ task.slaveId ] = savedOffers[ oid ][ task.slaveId ];
+
+ }
+ savedOffers.erase( oid );
+
+ if (isFT) {
+ TimeoutListener *tListener =
+ new TimeoutListener(this, tasks);
+
+ string ftId = ftMsg->getNextId();
+ DLOG(INFO) << "Sending reliably reply to slot offer for msg " << ftId;
+ ftMsg->reliableSend( ftId,
+ pack<F2M_FT_SLOT_OFFER_REPLY>(ftId, self(), fid, oid, tasks, params),
+ tListener);
+ } else
+ send(master, pack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params));
+
+ break;
+ }
+
+ case F2F_FRAMEWORK_MESSAGE: {
+ FrameworkMessage msg;
+ unpack<F2F_FRAMEWORK_MESSAGE>(msg);
+
+ /*
+ if (isFT) {
+ string ftId = ftMsg->getNextId();
+ ftMsg->reliableSend( ftId, pack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), fid, msg));
+ } else
+ send(master, pack<F2M_FRAMEWORK_MESSAGE>(fid, msg));
+ */
+ send( sidToPidMap[ msg.slaveId ], pack<M2S_FRAMEWORK_MESSAGE>(fid, msg));
+ break;
+ }
+
case M2F_RESCIND_OFFER: {
OfferID oid;
unpack<M2F_RESCIND_OFFER>(oid);
+ savedOffers.erase(oid);
invoke(bind(&Scheduler::offerRescinded, sched, driver, oid));
break;
}
@@ -240,7 +295,6 @@ protected:
case M2F_STATUS_UPDATE: {
- DLOG(INFO) << "Got M2F_STATUS_UPDATE";
TaskID tid;
TaskState state;
string data;
@@ -274,6 +328,7 @@ protected:
case M2F_LOST_SLAVE: {
SlaveID sid;
unpack<M2F_LOST_SLAVE>(sid);
+ sidToPidMap.erase(sid);
invoke(bind(&Scheduler::slaveLost, sched, driver, sid));
break;
}
@@ -536,27 +591,8 @@ void NexusSchedulerDriver::replyToOffer(
}
// TODO(benh): Do a Process::post instead?
-
- if (process->isFT) {
- SchedulerProcess::TimeoutListener *tListener =
- new SchedulerProcess::TimeoutListener(process, tasks);
-
- string ftId = process->ftMsg->getNextId();
- DLOG(INFO) << "Sending reliably reply to slot offer for msg " << ftId;
- process->ftMsg->reliableSend( ftId,
- process->pack<F2M_FT_SLOT_OFFER_REPLY>(ftId,
- process->self(),
- process->fid,
- offerId,
- tasks,
- Params(params)),
- tListener);
- } else
- process->send(process->master,
- process->pack<F2M_SLOT_OFFER_REPLY>(process->fid,
- offerId,
- tasks,
- Params(params)));
+
+ process->send( process->self(), process->pack<F2F_SLOT_OFFER_REPLY>(offerId, tasks, Params(params)));
}
@@ -585,12 +621,7 @@ void NexusSchedulerDriver::sendFramework
return;
}
- if (process->isFT) {
- string ftId = process->ftMsg->getNextId();
- process->ftMsg->reliableSend( ftId, process->pack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, process->self(), process->fid, message));
- } else
- process->send(process->master,
- process->pack<F2M_FRAMEWORK_MESSAGE>(process->fid, message));
+ process->send( process->self(), process->pack<F2F_FRAMEWORK_MESSAGE>(message) );
}
Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131614&r1=1131613&r2=1131614&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun 5 03:30:00 2011
@@ -209,10 +209,10 @@ void Slave::operator () ()
}
case M2S_RUN_TASK: {
- string fwName, user, taskName, taskArg;
+ string fwName, user, taskName, taskArg, fwPidStr;
ExecutorInfo execInfo;
unpack<M2S_RUN_TASK>(fid, tid, fwName, user, execInfo,
- taskName, taskArg, params);
+ taskName, taskArg, params, fwPidStr);
LOG(INFO) << "Got assigned task " << fid << ":" << tid;
Resources res;
res.cpus = params.getInt32("cpus", -1);
@@ -220,7 +220,10 @@ void Slave::operator () ()
Framework *framework = getFramework(fid);
if (framework == NULL) {
// Framework not yet created on this node - create it
- framework = new Framework(fid, fwName, user, execInfo);
+ PID fwPid;
+ istringstream ss(fwPidStr);
+ ss >> fwPid;
+ framework = new Framework(fid, fwName, user, execInfo, fwPid);
frameworks[fid] = framework;
isolationModule->frameworkAdded(framework);
isolationModule->startExecutor(framework);
@@ -344,11 +347,14 @@ void Slave::operator () ()
unpack<E2S_FRAMEWORK_MESSAGE>(fid, message);
// Set slave ID in case framework omitted it
message.slaveId = this->id;
+ /*
if (isFT) {
string ftId = ftMsg->getNextId();
ftMsg->reliableSend(ftId, pack<S2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), id, fid, message));
} else
send(master, pack<S2M_FRAMEWORK_MESSAGE>(id, fid, message));
+ */
+ send(getFramework(fid)->fwPid, pack<M2F_FRAMEWORK_MESSAGE>(message));
break;
}
Modified: incubator/mesos/trunk/src/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.hpp?rev=1131614&r1=1131613&r2=1131614&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave.hpp Sun Jun 5 03:30:00 2011
@@ -83,14 +83,15 @@ struct Framework
list<TaskDescription *> queuedTasks; // Holds tasks until executor starts
unordered_map<TaskID, TaskInfo *> tasks;
Resources resources;
+ PID fwPid;
// Information about the status of the executor for this framework, set by
// the isolation module. For example, this might include a PID, a VM ID, etc.
string executorStatus;
Framework(FrameworkID _id, const string& _name, const string& _user,
- const ExecutorInfo& _executorInfo)
- : id(_id), name(_name), user(_user), executorInfo(_executorInfo) {}
+ const ExecutorInfo& _executorInfo, const PID& _fwPid)
+ : id(_id), name(_name), user(_user), executorInfo(_executorInfo), fwPid(_fwPid) {}
~Framework()
{