You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:33:38 UTC
svn commit: r1131642 - /incubator/mesos/trunk/src/master.cpp
Author: benh
Date: Sun Jun 5 03:33:37 2011
New Revision: 1131642
URL: http://svn.apache.org/viewvc?rev=1131642&view=rev
Log:
Bug fixed in calls to accept message
Modified:
incubator/mesos/trunk/src/master.cpp
Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131642&r1=1131641&r2=1131642&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun 5 03:33:37 2011
@@ -1,15 +1,11 @@
-#include "config.hpp" // Need to define first to get USING_ZOOKEEPER
-
-#include <glog/logging.h>
-
-#ifdef USING_ZOOKEEPER
-#include <zookeeper.hpp>
-#endif
-
#include "allocator.hpp"
-#include "allocator_factory.hpp"
#include "master.hpp"
#include "master_webui.hpp"
+#include "allocator_factory.hpp"
+#include "ft_messaging.hpp"
+
+#include <glog/logging.h>
+
using std::endl;
using std::max;
@@ -31,56 +27,8 @@ using namespace nexus::internal;
using namespace nexus::internal::master;
-/* List of ZooKeeper host:port pairs (from master_main.cpp/local.cpp). */
-extern string zookeeper;
-
namespace {
-#ifdef USING_ZOOKEEPER
-class MasterWatcher : public Watcher
-{
-private:
- Master *master;
-
-public:
- void process(ZooKeeper *zk, int type, int state, const string &path)
- {
- if ((state == ZOO_CONNECTED_STATE) && (type == ZOO_SESSION_EVENT)) {
- // Create znode for master identification.
- string znode = "/home/nexus/master";
- string dirname = "/home/nexus";
- string delimiter = "/";
- string contents = "";
-
- int ret;
- string result;
-
- // Create directory path znodes as necessary.
- size_t index = dirname.find(delimiter, 0);
- while (index < string::npos) {
- index = dirname.find(delimiter, index+1);
- string prefix = dirname.substr(0, index);
- ret = zk->create(prefix, contents, ZOO_CREATOR_ALL_ACL,
- 0, &result);
- if (ret != ZOK && ret != ZNODEEXISTS)
- fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
- }
-
- // Now create znode.
- ret = zk->create(znode, master->getPID(), ZOO_CREATOR_ALL_ACL,
- ZOO_EPHEMERAL, &result);
-
- if (ret != ZOK)
- fatal("failed to create ZooKeeper znode! (%s)", zk->error(ret));
- } else {
- fatal("unhandled ZooKeeper event!");
- }
- }
-
- MasterWatcher(Master *_master) : master(_master) {}
-};
-#endif
-
// A process that periodically pings the master to check filter expiries, etc
class AllocatorTimer : public Tuple<Process>
{
@@ -163,24 +111,50 @@ public:
}
-Master::Master()
- : nextFrameworkId(0), nextSlaveId(0), nextSlotOfferId(0),
- allocatorType("simple")
-{}
+Master::Master(const string &zk)
+ : leaderDetector(NULL), nextFrameworkId(0), nextSlaveId(0),
+ nextSlotOfferId(0), allocatorType("simple"), masterId(0)
+{
+ if (zk != "") {
+ pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(zk);
+ if (urlPair.first == UrlProcessor::ZOO) {
+ isFT = true;
+ zkServers = urlPair.second;
+ } else {
+ LOG(ERROR) << "Failed to parse URL for ZooKeeper servers. URL must start with zoo:// or zoofile://";
+ exit(1);
+ }
+ }
+ ftMsg = FTMessaging::getInstance();
+}
-Master::Master(const string& _allocatorType)
- : nextFrameworkId(0), nextSlaveId(0), nextSlotOfferId(0),
- allocatorType(_allocatorType)
-{}
+Master::Master(const string& _allocatorType, const string &zk)
+ : leaderDetector(NULL), nextFrameworkId(0), nextSlaveId(0),
+ nextSlotOfferId(0), allocatorType(_allocatorType), masterId(0)
+{
+ if (zk != "") {
+ pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(zk);
+ if (urlPair.first == UrlProcessor::ZOO) {
+ isFT = true;
+ zkServers = urlPair.second;
+ } else {
+ LOG(ERROR) << "Failed to parse URL for ZooKeeper servers. URL must start with zoo:// or zoofile://";
+ exit(1);
+ }
+ }
+ ftMsg = FTMessaging::getInstance();
+}
Master::~Master()
{
+ if (isFT && leaderDetector != NULL)
+ delete leaderDetector;
LOG(INFO) << "Shutting down master";
delete allocator;
foreachpair (_, Framework *framework, frameworks) {
- foreachpair(_, Task *task, framework->tasks)
+ foreachpair(_, TaskInfo *task, framework->tasks)
delete task;
delete framework;
}
@@ -211,7 +185,7 @@ state::MasterState * Master::getState()
f->executorInfo.uri, f->resources.cpus, f->resources.mem,
f->connectTime);
state->frameworks.push_back(framework);
- foreachpair (_, Task *t, f->tasks) {
+ foreachpair (_, TaskInfo *t, f->tasks) {
state::Task *task = new state::Task(t->id, t->name, t->frameworkId,
t->slaveId, t->state, t->resources.cpus, t->resources.mem);
framework->tasks.push_back(task);
@@ -285,10 +259,44 @@ SlotOffer * Master::lookupSlotOffer(Offe
return NULL;
}
+void Master::updateFrameworkTasks() {
+ foreachpair (SlaveID sid, Slave *slave, slaves) {
+ foreachpair (_, TaskInfo *task, slave->tasks) {
+ updateFrameworkTasks(task);
+ }
+ }
+}
+
+//alibandali++
+void Master::updateFrameworkTasks(TaskInfo *task) {
+ Framework *fwrk = lookupFramework(task->frameworkId);
+ if (fwrk != NULL) {
+ if (fwrk->tasks.find(task->id) == fwrk->tasks.end()) {
+ fwrk->tasks[task->id] = task;
+ // this->resources += resources; // alig: not sure if this should be done or not
+ }
+ }
+}
+
void Master::operator () ()
{
- LOG(INFO) << "Master started at " << self();
+ LOG(INFO) << "Master started at nexus://" << self();
+
+ if (isFT) {
+ LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
+ ostringstream lpid;
+ lpid << self();
+ leaderDetector = new LeaderDetector(zkServers, true, lpid.str());
+
+ string myLeaderSeq = leaderDetector->getMySeq();
+ if (myLeaderSeq == "") {
+ LOG(FATAL) << "Cannot proceed since new FT master sequence number could not be fetched from ZK.";
+ exit(1);
+ }
+ masterId = lexical_cast<long>(myLeaderSeq);
+ LOG(INFO) << "Master ID:" << masterId;
+ }
allocator = createAllocator();
if (!allocator)
@@ -297,21 +305,16 @@ void Master::operator () ()
link(spawn(new AllocatorTimer(self())));
//link(spawn(new SharesPrinter(self())));
-#ifdef USING_ZOOKEEPER
- ZooKeeper *zk;
- if (!zookeeper.empty())
- zk = new ZooKeeper(zookeeper, 10000, new MasterWatcher(this));
-#endif
-
while (true) {
switch (receive()) {
case F2M_REGISTER_FRAMEWORK: {
- FrameworkID fid = nextFrameworkId++;
+ FrameworkID fid = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextFrameworkId++);
+
Framework *framework = new Framework(from(), fid);
unpack<F2M_REGISTER_FRAMEWORK>(framework->name,
- framework->user,
- framework->executorInfo);
+ framework->user,
+ framework->executorInfo);
LOG(INFO) << "Registering " << framework << " at " << framework->pid;
frameworks[fid] = framework;
pidToFid[framework->pid] = fid;
@@ -323,13 +326,75 @@ void Master::operator () ()
break;
}
+ case F2M_REREGISTER_FRAMEWORK: {
+
+ Framework *framework = new Framework(from());
+ unpack<F2M_REREGISTER_FRAMEWORK>(framework->id,
+ framework->name,
+ framework->user,
+ framework->executorInfo);
+
+ if (framework->id == "") {
+ DLOG(INFO) << "Framework reconnecting without a FrameworkID, generating new id";
+ framework->id = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextFrameworkId++);
+ }
+
+ LOG(INFO) << "Registering " << framework << " at " << framework->pid;
+ frameworks[framework->id] = framework;
+ pidToFid[framework->pid] = framework->id;
+
+ updateFrameworkTasks();
+
+ link(framework->pid);
+ send(framework->pid, pack<M2F_REGISTER_REPLY>(framework->id));
+ allocator->frameworkAdded(framework);
+ if (framework->executorInfo.uri == "")
+ terminateFramework(framework, 1, "No executor URI given");
+
+ DLOG(INFO) << "STAT: Slave count: " << slaves.size() << " Framework count: " << frameworks.size();
+
+ break;
+ }
+
case F2M_UNREGISTER_FRAMEWORK: {
FrameworkID fid;
unpack<F2M_UNREGISTER_FRAMEWORK>(fid);
LOG(INFO) << "Asked to unregister framework " << fid;
Framework *framework = lookupFramework(fid);
if (framework != NULL)
- removeFramework(framework);
+ removeFramework(framework);
+ break;
+ }
+
+ case F2M_FT_SLOT_OFFER_REPLY: {
+ FrameworkID fid;
+ OfferID oid;
+ vector<TaskDescription> tasks;
+ Params params;
+ string ftId, senderStr;
+ unpack<F2M_FT_SLOT_OFFER_REPLY>(ftId, senderStr, fid, oid, tasks, params);
+ PID senderPid;
+ istringstream ss(senderStr);
+ ss >> senderPid;
+ if (!ftMsg->acceptMessageAckTo(ftId, senderPid, senderStr)) {
+ LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << ftId;
+ break;
+ }
+ Framework *framework = lookupFramework(fid);
+ if (framework != NULL) {
+ SlotOffer *offer = lookupSlotOffer(oid);
+ if (offer != NULL) {
+ processOfferReply(offer, tasks, params);
+ } else {
+ // The slot offer is gone, meaning that we rescinded it or that
+ // the slave was lost; immediately report any tasks in it as lost
+ foreach (TaskDescription &t, tasks) {
+ send(framework->pid,
+ pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
+ }
+ }
+ } else
+ DLOG(INFO) << "F2M_FT_SLOT_OFFER_REPLY error: couldn't lookup framework id" << fid;
break;
}
@@ -341,17 +406,17 @@ void Master::operator () ()
unpack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params);
Framework *framework = lookupFramework(fid);
if (framework != NULL) {
- SlotOffer *offer = lookupSlotOffer(oid);
- if (offer != NULL) {
- processOfferReply(offer, tasks, params);
- } else {
- // The slot offer is gone, meaning that we rescinded it or that
- // the slave was lost; immediately report any tasks in it as lost
- foreach (TaskDescription &t, tasks) {
- send(framework->pid,
- pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
- }
- }
+ SlotOffer *offer = lookupSlotOffer(oid);
+ if (offer != NULL) {
+ processOfferReply(offer, tasks, params);
+ } else {
+ // The slot offer is gone, meaning that we rescinded it or that
+ // the slave was lost; immediately report any tasks in it as lost
+ foreach (TaskDescription &t, tasks) {
+ send(framework->pid,
+ pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
+ }
+ }
}
break;
}
@@ -361,9 +426,9 @@ void Master::operator () ()
unpack<F2M_REVIVE_OFFERS>(fid);
Framework *framework = lookupFramework(fid);
if (framework != NULL) {
- LOG(INFO) << "Reviving offers for " << framework;
- framework->slaveFilter.clear();
- allocator->offersRevived(framework);
+ LOG(INFO) << "Reviving offers for " << framework;
+ framework->slaveFilter.clear();
+ allocator->offersRevived(framework);
}
break;
}
@@ -374,34 +439,39 @@ void Master::operator () ()
unpack<F2M_KILL_TASK>(fid, tid);
Framework *framework = lookupFramework(fid);
if (framework != NULL) {
- Task *task = framework->lookupTask(tid);
- if (task != NULL) {
- LOG(INFO) << "Asked to kill " << task << " by its framework";
- killTask(task);
- }
+ TaskInfo *task = framework->lookupTask(tid);
+ if (task != NULL) {
+ LOG(INFO) << "Asked to kill " << task << " by its framework";
+ killTask(task);
+ }
}
break;
}
- case F2M_FRAMEWORK_MESSAGE: {
+ case F2M_FT_FRAMEWORK_MESSAGE: {
FrameworkID fid;
FrameworkMessage message;
- unpack<F2M_FRAMEWORK_MESSAGE>(fid, message);
+ string ftId, senderStr;
+ unpack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, fid, message);
Framework *framework = lookupFramework(fid);
if (framework != NULL) {
- Slave *slave = lookupSlave(message.slaveId);
- if (slave != NULL) {
- LOG(INFO) << "Sending framework message to " << slave;
- send(slave->pid, pack<M2S_FRAMEWORK_MESSAGE>(fid, message));
- }
- }
+ Slave *slave = lookupSlave(message.slaveId);
+ if (slave != NULL) {
+ LOG(INFO) << "Sending framework message to " << slave;
+ send(slave->pid, pack<M2S_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, fid, message));
+ } else
+ DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup framework id" << fid;
+ } else
+ DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup slave id" << message.slaveId;
break;
}
case S2M_REGISTER_SLAVE: {
- Slave *slave = new Slave(from(), nextSlaveId++);
+ string slaveId = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextSlaveId++);
+
+ Slave *slave = new Slave(from(), slaveId);
unpack<S2M_REGISTER_SLAVE>(slave->hostname, slave->publicDns,
- slave->resources);
+ slave->resources);
LOG(INFO) << "Registering " << slave << " at " << slave->pid;
slaves[slave->id] = slave;
pidToSid[slave->pid] = slave->id;
@@ -411,29 +481,70 @@ void Master::operator () ()
break;
}
+ case S2M_REREGISTER_SLAVE: {
+ Slave *slave = new Slave(from());
+ vector<TaskInfo> taskVec;
+
+ unpack<S2M_REREGISTER_SLAVE>(slave->id, slave->hostname, slave->publicDns,
+ slave->resources, taskVec);
+
+ if (slave->id == "") {
+ slave->id = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextSlaveId++);
+ DLOG(WARNING) << "Slave re-registered without a SlaveID, generating a new id for it.";
+ }
+
+ foreach(TaskInfo &ti, taskVec) {
+ TaskInfo *tip = new TaskInfo(ti);
+ slave->addTask(tip);
+ updateFrameworkTasks(tip);
+ }
+
+ //alibandali
+ LOG(INFO) << "Re-registering " << slave << " at " << slave->pid;
+ slaves[slave->id] = slave;
+ pidToSid[slave->pid] = slave->id;
+ link(slave->pid);
+ send(slave->pid, pack<M2S_REREGISTER_REPLY>(slave->id));
+ allocator->slaveAdded(slave);
+
+ DLOG(INFO) << "STAT: Slave count: " << slaves.size() << " Framework count: " << frameworks.size();
+
+ break;
+ }
+
case S2M_UNREGISTER_SLAVE: {
SlaveID sid;
unpack<S2M_UNREGISTER_SLAVE>(sid);
LOG(INFO) << "Asked to unregister slave " << sid;
Slave *slave = lookupSlave(sid);
if (slave != NULL)
- removeSlave(slave);
+ removeSlave(slave);
break;
}
- case S2M_STATUS_UPDATE: {
+ case S2M_FT_STATUS_UPDATE: {
SlaveID sid;
FrameworkID fid;
TaskID tid;
TaskState state;
string data;
- unpack<S2M_STATUS_UPDATE>(sid, fid, tid, state, data);
+ string ftId, senderStr;
+
+ unpack<S2M_FT_STATUS_UPDATE>(ftId, senderStr, sid, fid, tid, state, data);
+ DLOG(INFO) << "FT: prepare relay ftId:"<< ftId << " from: "<< senderStr;
if (Slave *slave = lookupSlave(sid)) {
- if (Framework *framework = lookupFramework(fid)) {
- // Pass on the status update to the framework
- send(framework->pid, pack<M2F_STATUS_UPDATE>(tid, state, data));
+ if (Framework *framework = lookupFramework(fid)) {
+ // Pass on the status update to the framework
+
+ DLOG(INFO) << "FT: relaying ftId:"<< ftId << " from: "<< senderStr;
+ send(framework->pid, pack<M2F_FT_STATUS_UPDATE>(ftId, senderStr, tid, state, data));
+
+ if (!ftMsg->acceptMessage(ftId, senderStr)) {
+ LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << ftId;
+ break;
+ }
// Update the task state locally
- Task *task = slave->lookupTask(fid, tid);
+ TaskInfo *task = slave->lookupTask(fid, tid);
if (task != NULL) {
LOG(INFO) << "Status update: " << task << " is in state " << state;
task->state = state;
@@ -444,11 +555,65 @@ void Master::operator () ()
removeTask(task, TRR_TASK_ENDED);
}
}
- }
- break;
- }
+ } else
+ DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup framework id" << fid;
+ } else
+ DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup slave id" << sid;
+
+ break;
+ }
+
+ case S2M_STATUS_UPDATE: {
+ SlaveID sid;
+ FrameworkID fid;
+ TaskID tid;
+ TaskState state;
+ string data;
+ unpack<S2M_STATUS_UPDATE>(sid, fid, tid, state, data);
+ if (Slave *slave = lookupSlave(sid)) {
+ if (Framework *framework = lookupFramework(fid)) {
+ // Pass on the status update to the framework
+ send(framework->pid, pack<M2F_STATUS_UPDATE>(tid, state, data));
+ // Update the task state locally
+ TaskInfo *task = slave->lookupTask(fid, tid);
+ if (task != NULL) {
+ LOG(INFO) << "Status update: " << task << " is in state " << state;
+ task->state = state;
+ // Remove the task if it finished or failed
+ if (state == TASK_FINISHED || state == TASK_FAILED ||
+ state == TASK_KILLED || state == TASK_LOST) {
+ LOG(INFO) << "Removing " << task << " because it's done";
+ removeTask(task, TRR_TASK_ENDED);
+ }
+ }
+ } else
+ DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup framework id" << fid;
+ } else
+ DLOG(INFO) << "S2M_STATUS_UPDATE error: couldn't lookup slave id" << sid;
+ break;
}
+ case S2M_FT_FRAMEWORK_MESSAGE: {
+ SlaveID sid;
+ FrameworkID fid;
+ FrameworkMessage message;
+ string ftId, senderStr;
+ unpack<S2M_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, sid, fid, message);
+ Slave *slave = lookupSlave(sid);
+ if (slave != NULL) {
+ Framework *framework = lookupFramework(fid);
+ if (framework != NULL) {
+
+ send(framework->pid, pack<M2F_FT_FRAMEWORK_MESSAGE>(ftId, senderStr, message));
+
+ } else
+ DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup framework id" << fid;
+ } else
+ DLOG(INFO) << "S2M_FT_FRAMEWORK_MESSAGE error: couldn't lookup slave id" << sid;
+
+ break;
+ }
+
case S2M_FRAMEWORK_MESSAGE: {
SlaveID sid;
FrameworkID fid;
@@ -456,9 +621,9 @@ void Master::operator () ()
unpack<S2M_FRAMEWORK_MESSAGE>(sid, fid, message);
Slave *slave = lookupSlave(sid);
if (slave != NULL) {
- Framework *framework = lookupFramework(fid);
- if (framework != NULL)
- send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
+ Framework *framework = lookupFramework(fid);
+ if (framework != NULL)
+ send(framework->pid, pack<M2F_FRAMEWORK_MESSAGE>(message));
}
break;
}
@@ -470,18 +635,18 @@ void Master::operator () ()
unpack<S2M_LOST_EXECUTOR>(sid, fid, status);
Slave *slave = lookupSlave(sid);
if (slave != NULL) {
- Framework *framework = lookupFramework(fid);
- if (framework != NULL) {
- ostringstream oss;
- if (status == -1) {
- oss << "Executor on " << slave << " (" << slave->hostname
- << ") disconnected";
- } else {
- oss << "Executor on " << slave << " (" << slave->hostname
- << ") exited with status " << status;
- }
- terminateFramework(framework, status, oss.str());
- }
+ Framework *framework = lookupFramework(fid);
+ if (framework != NULL) {
+ ostringstream oss;
+ if (status == -1) {
+ oss << "Executor on " << slave << " (" << slave->hostname
+ << ") disconnected";
+ } else {
+ oss << "Executor on " << slave << " (" << slave->hostname
+ << ") exited with status " << status;
+ }
+ terminateFramework(framework, status, oss.str());
+ }
}
break;
}
@@ -491,19 +656,43 @@ void Master::operator () ()
unpack<SH2M_HEARTBEAT>(sid);
Slave *slave = lookupSlave(sid);
if (slave != NULL)
- //LOG(INFO) << "Received heartbeat for " << slave << " from " << from();
- ;
+ //LOG(INFO) << "Received heartbeat for " << slave << " from " << from();
+ ;
else
- LOG(WARNING) << "Received heartbeat for UNKNOWN slave " << sid
- << " from " << from();
+ LOG(WARNING) << "Received heartbeat for UNKNOWN slave " << sid
+ << " from " << from();
break;
}
case M2M_TIMER_TICK: {
LOG(INFO) << "Allocator timer tick";
foreachpair (_, Framework *framework, frameworks)
- framework->removeExpiredFilters();
+ framework->removeExpiredFilters();
allocator->timerTick();
+
+ // int cnts = 0;
+ // foreachpair(_, Framework *framework, frameworks) {
+ // DLOG(INFO) << (cnts++) << " resourceInUse:" << framework->resources;
+ // }
+ break;
+ }
+
+ case FT_RELAY_ACK: {
+ string ftId;
+ string origPidStr;
+ unpack<FT_RELAY_ACK>(ftId, origPidStr);
+
+ DLOG(INFO) << "FT_RELAY_ACK for " << ftId << " forwarding it to " << origPidStr;
+
+ PID origPid;
+ istringstream iss(origPidStr);
+ if (!(iss >> origPid)) {
+ cerr << "FT: Failed to resolve PID for originator: " << origPidStr << endl;
+ break;
+ }
+
+ send(origPid, pack<FT_RELAY_ACK>(ftId, origPidStr));
+
break;
}
@@ -548,7 +737,8 @@ void Master::operator () ()
OfferID Master::makeOffer(Framework *framework,
const vector<SlaveResources>& resources)
{
- OfferID oid = nextSlotOfferId++;
+ OfferID oid = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextSlotOfferId++);
+
SlotOffer *offer = new SlotOffer(oid, framework->id, resources);
slotOffers[offer->id] = offer;
framework->addOffer(offer);
@@ -562,7 +752,7 @@ OfferID Master::makeOffer(Framework *fra
Params params;
params.set("cpus", r.resources.cpus);
params.set("mem", r.resources.mem);
- SlaveOffer offer(r.slave->id, r.slave->hostname, params.getMap());
+ SlaveOffer offer(r.slave->id, r.slave->hostname, params.getMap(), r.slave->pid);
offers.push_back(offer);
}
send(framework->pid, pack<M2F_SLOT_OFFER>(oid, offers));
@@ -661,13 +851,13 @@ void Master::launchTask(Framework *f, co
Resources res(params.getInt32("cpus", -1),
params.getInt64("mem", -1));
Slave *slave = lookupSlave(t.slaveId);
- Task *task = f->addTask(t.taskId, t.name, slave->id, res);
+ TaskInfo *task = f->addTask(t.taskId, t.name, slave->id, res);
LOG(INFO) << "Launching " << task << " on " << slave;
slave->addTask(task);
allocator->taskAdded(task);
send(slave->pid, pack<M2S_RUN_TASK>(
f->id, t.taskId, f->name, f->user, f->executorInfo,
- t.name, t.arg, t.params));
+ t.name, t.arg, t.params, (string)f->pid));
}
@@ -677,7 +867,7 @@ void Master::rescindOffer(SlotOffer *off
}
-void Master::killTask(Task *task)
+void Master::killTask(TaskInfo *task)
{
LOG(FATAL) << "Killing " << task;
Framework *framework = lookupFramework(task->frameworkId);
@@ -746,8 +936,8 @@ void Master::removeFramework(Framework *
send(slave->pid, pack<M2S_KILL_FRAMEWORK>(framework->id));
// Remove pointers to the framework's tasks in slaves
- unordered_map<TaskID, Task *> tasksCopy = framework->tasks;
- foreachpair (_, Task *task, tasksCopy) {
+ unordered_map<TaskID, TaskInfo *> tasksCopy = framework->tasks;
+ foreachpair (_, TaskInfo *task, tasksCopy) {
Slave *slave = lookupSlave(task->slaveId);
CHECK(slave != NULL);
removeTask(task, TRR_FRAMEWORK_LOST);
@@ -776,8 +966,8 @@ void Master::removeSlave(Slave *slave)
// TODO: Notify allocator that a slave removal is beginning?
// Remove pointers to slave's tasks in frameworks, and send status updates
- unordered_map<pair<FrameworkID, TaskID>, Task *> tasksCopy = slave->tasks;
- foreachpair (_, Task *task, tasksCopy) {
+ unordered_map<pair<FrameworkID, TaskID>, TaskInfo *> tasksCopy = slave->tasks;
+ foreachpair (_, TaskInfo *task, tasksCopy) {
Framework *framework = lookupFramework(task->frameworkId);
CHECK(framework != NULL);
send(framework->pid, pack<M2F_STATUS_UPDATE>(task->id, TASK_LOST,
@@ -818,7 +1008,7 @@ void Master::removeSlave(Slave *slave)
// Remove a slot offer (because it was replied or we lost a framework or slave)
-void Master::removeTask(Task *task, TaskRemovalReason reason)
+void Master::removeTask(TaskInfo *task, TaskRemovalReason reason)
{
Framework *framework = lookupFramework(task->frameworkId);
Slave *slave = lookupSlave(task->slaveId);