You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 08:42:10 UTC
svn commit: r1131961 - in /incubator/mesos/trunk/src: master.cpp master.hpp
messages.hpp slave.cpp
Author: benh
Date: Sun Jun 5 06:42:10 2011
New Revision: 1131961
URL: http://svn.apache.org/viewvc?rev=1131961&view=rev
Log:
Manual merge of a few minor changes made while running at Twitter.
Modified:
incubator/mesos/trunk/src/master.cpp
incubator/mesos/trunk/src/master.hpp
incubator/mesos/trunk/src/messages.hpp
incubator/mesos/trunk/src/slave.cpp
Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131961&r1=1131960&r2=1131961&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun 5 06:42:10 2011
@@ -35,7 +35,7 @@ namespace {
class AllocatorTimer : public Tuple<Process>
{
private:
- PID master;
+ const PID master;
protected:
void operator () ()
@@ -271,18 +271,13 @@ void Master::operator () ()
LOG(INFO) << "Master started at mesos://" << self();
// Don't do anything until we get an identifier.
- while (true) {
- if (receive() == GOT_MASTER_ID) {
- string id;
- unpack<GOT_MASTER_ID>(id);
- masterId = lexical_cast<int64_t>(id);
- LOG(INFO) << "Master ID:" << masterId;
- break;
- } else {
- LOG(INFO) << "Oops! We're dropping a message since "
- << "we haven't received an identifier yet!";
- }
- }
+ while (receive() != GOT_MASTER_ID)
+ LOG(INFO) << "Oops! We're dropping a message since "
+ << "we haven't received an identifier yet!";
+ string id;
+ unpack<GOT_MASTER_ID>(id);
+ masterId = lexical_cast<long>(id);
+ LOG(INFO) << "Master ID:" << masterId;
// Create the allocator (we do this after the constructor because it
// leaks 'this').
@@ -378,7 +373,7 @@ void Master::operator () ()
} else {
// The slot offer is gone, meaning that we rescinded it or that
// the slave was lost; immediately report any tasks in it as lost
- foreach (TaskDescription &t, tasks) {
+ foreach (const TaskDescription &t, tasks) {
send(framework->pid,
pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
}
@@ -409,6 +404,9 @@ void Master::operator () ()
if (task != NULL) {
LOG(INFO) << "Asked to kill " << task << " by its framework";
killTask(task);
+ } else {
+ LOG(INFO) << "Asked to kill UNKNOWN task by its framework";
+ send(framework->pid, pack<M2F_STATUS_UPDATE>(tid, TASK_LOST, ""));
}
}
break;
@@ -432,9 +430,9 @@ void Master::operator () ()
case S2M_REREGISTER_SLAVE: {
Slave *slave = new Slave(from(), "", elapsed());
- vector<Task> taskVec;
+ vector<Task> tasks;
unpack<S2M_REREGISTER_SLAVE>(slave->id, slave->hostname, slave->publicDns,
- slave->resources, taskVec);
+ slave->resources, tasks);
if (slave->id == "") {
slave->id = lexical_cast<string>(masterId) + "-"
@@ -443,10 +441,10 @@ void Master::operator () ()
<< "generating a new id for it.";
}
- foreach(Task &ti, taskVec) {
- Task *tip = new Task(ti);
- slave->addTask(tip);
- updateFrameworkTasks(tip);
+ foreach(const Task &t, tasks) {
+ Task *task = new Task(t);
+ slave->addTask(task);
+ updateFrameworkTasks(task);
}
// TODO(benh|alig): We should put a timeout on how long we keep
@@ -484,7 +482,11 @@ void Master::operator () ()
VLOG(1) << "FT: prepare relay seq:"<< seq() << " from: "<< from();
if (Slave *slave = lookupSlave(sid)) {
if (Framework *framework = lookupFramework(fid)) {
- // Pass on the status update to the framework.
+ // Pass on the status update to the framework.
+ // TODO(benh): Do we not want to forward the
+ // S2M_FT_STATUS_UPDATE message? This seems a little tricky
+ // because we really wanted to send the M2F_FT_STATUS_UPDATE
+ // message.
forward(framework->pid);
if (duplicate()) {
LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << seq();
@@ -636,6 +638,17 @@ void Master::operator () ()
break;
}
+ case M2M_FRAMEWORK_EXPIRED: {
+ FrameworkID fid;
+ unpack<M2M_FRAMEWORK_EXPIRED>(fid);
+ if (Framework *framework = lookupFramework(fid)) {
+ LOG(INFO) << "Framework failover timer expired, removing framework "
+ << framework;
+ removeFramework(framework);
+ }
+ break;
+ }
+
case PROCESS_EXIT: {
// TODO(benh): Could we get PROCESS_EXIT from a network partition?
LOG(INFO) << "Process exited: " << from();
@@ -643,7 +656,8 @@ void Master::operator () ()
FrameworkID fid = pidToFid[from()];
if (Framework *framework = lookupFramework(fid)) {
LOG(INFO) << framework << " disconnected";
- // TODO(benh): Wait for a framework failover.
+// framework->failoverTimer = new FrameworkFailoverTimer(self(), fid);
+// link(spawn(framework->failoverTimer));
removeFramework(framework);
}
} else if (pidToSid.find(from()) != pidToSid.end()) {
@@ -652,6 +666,16 @@ void Master::operator () ()
LOG(INFO) << slave << " disconnected";
removeSlave(slave);
}
+ } else {
+ foreachpair (_, Framework *framework, frameworks) {
+ if (framework->failoverTimer != NULL &&
+ framework->failoverTimer->getPID() == from()) {
+ LOG(INFO) << "Lost framework failover timer, removing framework "
+ << framework;
+ removeFramework(framework);
+ break;
+ }
+ }
}
break;
}
Modified: incubator/mesos/trunk/src/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.hpp?rev=1131961&r1=1131960&r2=1131961&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.hpp (original)
+++ incubator/mesos/trunk/src/master.hpp Sun Jun 5 06:42:10 2011
@@ -72,11 +72,43 @@ const double HEARTBEAT_INTERVAL = 2;
// Acceptable time since we saw the last heartbeat (four heartbeats).
const double HEARTBEAT_TIMEOUT = HEARTBEAT_INTERVAL * 4;
+// Time to wait for a framework to failover (TODO(benh): Make configurable)).
+const time_t FRAMEWORK_FAILOVER_TIMEOUT = 60;
+
// Some forward declarations
class Slave;
class Allocator;
+class FrameworkFailoverTimer : public Tuple<Process>
+{
+private:
+ const PID master;
+ const FrameworkID fid;
+
+protected:
+ void operator () ()
+ {
+ link(master);
+ do {
+ switch (receive(FRAMEWORK_FAILOVER_TIMEOUT)) {
+ case PROCESS_TIMEOUT:
+ send(master, pack<M2M_FRAMEWORK_EXPIRED>(fid));
+ return;
+ case PROCESS_EXIT:
+ return;
+ case M2M_SHUTDOWN:
+ return;
+ }
+ } while (true);
+ }
+
+public:
+ FrameworkFailoverTimer(const PID &_master, FrameworkID _fid)
+ : master(_master), fid(_fid) {}
+};
+
+
// Resources offered on a particular slave.
struct SlaveResources
{
@@ -102,7 +134,7 @@ struct SlotOffer
// An connected framework.
struct Framework
-{
+{
PID pid;
FrameworkID id;
bool active; // Turns false when framework is being removed
@@ -120,8 +152,22 @@ struct Framework
// or 0 for slaves that we want to keep filtered forever
unordered_map<Slave *, double> slaveFilter;
+ // A failover timer if the connection to this framework is lost.
+ FrameworkFailoverTimer *failoverTimer;
+
Framework(const PID &_pid, FrameworkID _id, double time)
- : pid(_pid), id(_id), active(true), connectTime(time) {}
+ : pid(_pid), id(_id), active(true), connectTime(time),
+ failoverTimer(NULL) {}
+
+ ~Framework()
+ {
+ if (failoverTimer != NULL) {
+ Process::post(failoverTimer->self(), M2M_SHUTDOWN);
+ Process::wait(failoverTimer->self());
+ delete failoverTimer;
+ failoverTimer = NULL;
+ }
+ }
Task * lookupTask(TaskID tid)
{
@@ -297,7 +343,7 @@ public:
state::MasterState *getState();
OfferID makeOffer(Framework *framework,
- const vector<SlaveResources>& resources);
+ const vector<SlaveResources>& resources);
void rescindOffer(SlotOffer *offer);
Modified: incubator/mesos/trunk/src/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.hpp?rev=1131961&r1=1131960&r2=1131961&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.hpp (original)
+++ incubator/mesos/trunk/src/messages.hpp Sun Jun 5 06:42:10 2011
@@ -92,17 +92,18 @@ enum MessageType {
#endif /* __sun__ */
/* Internal to master */
- M2M_GET_STATE, // Used by web UI
+ M2M_GET_STATE, // Used by web UI
M2M_GET_STATE_REPLY,
- M2M_TIMER_TICK, // Timer for expiring filters etc
- M2M_SHUTDOWN, // Used in tests to shut down master
+ M2M_TIMER_TICK, // Timer for expiring filters etc
+ M2M_FRAMEWORK_EXPIRED, // Timer for expiring frameworks
+ M2M_SHUTDOWN, // Used in tests to shut down master
/* Internal to slave */
- S2S_GOT_MASTER, // Used when looking up master with ZooKeeper
- S2S_GET_STATE, // Used by web UI
+ S2S_GOT_MASTER, // Used when looking up master with ZooKeeper
+ S2S_GET_STATE, // Used by web UI
S2S_GET_STATE_REPLY,
- S2S_CHILD_EXIT, // Sent by reaper process
- S2S_SHUTDOWN, // Used in tests to shut down slave
+ S2S_CHILD_EXIT, // Sent by reaper process
+ S2S_SHUTDOWN, // Used in tests to shut down slave
MESOS_MESSAGES,
};
@@ -332,7 +333,10 @@ TUPLE(M2M_GET_STATE_REPLY,
TUPLE(M2M_TIMER_TICK,
());
-
+
+TUPLE(M2M_FRAMEWORK_EXPIRED,
+ (FrameworkID));
+
TUPLE(M2M_SHUTDOWN,
());
Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131961&r1=1131960&r2=1131961&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun 5 06:42:10 2011
@@ -511,8 +511,17 @@ string Slave::getWorkDirectory(Framework
} else {
workDir = "work";
}
+
+ // TODO(benh): Clean this up, check for errors, etc.
+ time_t rawtime;
+ struct tm* timeinfo;
+ time(&rawtime);
+ timeinfo = localtime(&rawtime);
+ char timestr[32];
+ strftime(timestr, sizeof(timestr), "%Y-%m-%d-%H:%M", timeinfo);
+
ostringstream fwDir;
- fwDir << workDir << "/slave-" << id << "/fw-" << fid;
+ fwDir << workDir << "/slave-" << id << "/fw-" << fid << "/" << timestr;
return fwDir.str();
}