You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:29:52 UTC
svn commit: r1131613 - in /incubator/mesos/trunk/src: master.cpp master.hpp
nexus_sched.cpp slave.cpp slave.hpp
Author: benh
Date: Sun Jun 5 03:29:52 2011
New Revision: 1131613
URL: http://svn.apache.org/viewvc?rev=1131613&view=rev
Log:
Bugfixes. Slaves and frameworks can now be started before the master. Slave and frameworks start with empty id rather than -1. Capitalized some variable names correctly. Removed unnecessary assignment in the leader listener that could lead to race conditions.
Modified:
incubator/mesos/trunk/src/master.cpp
incubator/mesos/trunk/src/master.hpp
incubator/mesos/trunk/src/nexus_sched.cpp
incubator/mesos/trunk/src/slave.cpp
incubator/mesos/trunk/src/slave.hpp
Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131613&r1=1131612&r2=1131613&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun 5 03:29:52 2011
@@ -119,7 +119,7 @@ Master::Master(const string &zk)
pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(zk);
if (urlPair.first == UrlProcessor::ZOO) {
isFT = true;
- zkservers = urlPair.second;
+ zkServers = urlPair.second;
} else {
LOG(ERROR) << "Failed to parse URL for ZooKeeper servers. URL must start with zoo:// or zoofile://";
exit(1);
@@ -137,7 +137,7 @@ Master::Master(const string& _allocatorT
pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(zk);
if (urlPair.first == UrlProcessor::ZOO) {
isFT = true;
- zkservers = urlPair.second;
+ zkServers = urlPair.second;
} else {
LOG(ERROR) << "Failed to parse URL for ZooKeeper servers. URL must start with zoo:// or zoofile://";
exit(1);
@@ -284,10 +284,10 @@ void Master::operator () ()
LOG(INFO) << "Master started at nexus://" << self();
if (isFT) {
- LOG(INFO) << "Connecting to ZooKeeper at " << zkservers;
+ LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
ostringstream lpid;
lpid << self();
- leaderDetector = new LeaderDetector(zkservers, true, lpid.str());
+ leaderDetector = new LeaderDetector(zkServers, true, lpid.str());
string myLeaderSeq = leaderDetector->getMySeq();
if (myLeaderSeq == "") {
@@ -334,6 +334,12 @@ void Master::operator () ()
framework->name,
framework->user,
framework->executorInfo);
+
+ if (framework->id == "") {
+ DLOG(INFO) << "Framework reconnecting without a FrameworkID, generating new id";
+ framework->id = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextFrameworkId++);
+ }
+
LOG(INFO) << "Registering " << framework << " at " << framework->pid;
frameworks[framework->id] = framework;
pidToFid[framework->pid] = framework->id;
@@ -480,6 +486,11 @@ void Master::operator () ()
unpack<S2M_REREGISTER_SLAVE>(slave->id, slave->hostname, slave->publicDns,
slave->resources, taskVec);
+ if (slave->id == "") {
+ slave->id = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextSlaveId++);
+ DLOG(WARNING) << "Slave re-registered without a SlaveID, generating a new id for it.";
+ }
+
foreach(TaskInfo &ti, taskVec) {
TaskInfo *tip = new TaskInfo(ti);
slave->addTask(tip);
Modified: incubator/mesos/trunk/src/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.hpp?rev=1131613&r1=1131612&r2=1131613&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.hpp (original)
+++ incubator/mesos/trunk/src/master.hpp Sun Jun 5 03:29:52 2011
@@ -262,7 +262,7 @@ class Master : public Tuple<Process>
{
protected:
bool isFT;
- string zkservers;
+ string zkServers;
LeaderDetector *leaderDetector;
unordered_map<FrameworkID, Framework *> frameworks;
unordered_map<SlaveID, Slave *> slaves;
Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131613&r1=1131612&r2=1131613&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun 5 03:29:52 2011
@@ -74,7 +74,7 @@ private:
string frameworkName;
ExecutorInfo execInfo;
bool isFT;
- string zkservers;
+ string zkServers;
LeaderDetector *leaderDetector;
FTMessaging *ftMsg;
@@ -85,15 +85,13 @@ private:
class SchedLeaderListener : public LeaderListener {
public:
- // TODO(alig): make thread safe
+ // Need to be thread safe. Currently does not use any shared variables.
SchedLeaderListener(SchedulerProcess *s, PID pp) : parent(s), parentPID(pp) {}
virtual void newLeaderElected(const string &zkId, const string &pidStr) {
if (zkId != "") {
LOG(INFO) << "Leader listener detected leader at " << pidStr <<" with ephemeral id:" << zkId;
- parent->zkservers = pidStr;
-
LOG(INFO) << "Sending message to parent " << parentPID << " about new leader";
parent->send(parentPID, parent->pack<LE_NEWLEADER>(pidStr));
@@ -134,7 +132,7 @@ public:
const ExecutorInfo& _execInfo)
: driver(_driver),
sched(_sched),
- fid("-1"),
+ fid(""),
terminate(false),
frameworkName(_frameworkName),
execInfo(_execInfo),
@@ -144,7 +142,7 @@ public:
pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(_master);
if (urlPair.first == UrlProcessor::ZOO) {
isFT = true;
- zkservers = urlPair.second;
+ zkServers = urlPair.second;
// } else if (urlPair.first == UrlProcessor::NEXUS) {
} else {
isFT = false;
@@ -169,8 +167,8 @@ protected:
string user(passwd->pw_name);
if (isFT) {
- LOG(INFO) << "Connecting to ZooKeeper at " << zkservers;
- leaderDetector = new LeaderDetector(zkservers, false, "", NULL);
+ LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
+ leaderDetector = new LeaderDetector(zkServers, false, "", NULL);
leaderDetector->setListener(&schedLeaderListener); // use this instead of constructor to avoid race condition
string leaderPidStr = leaderDetector->getCurrentLeaderPID();
@@ -314,7 +312,12 @@ protected:
LOG(INFO) << "Connecting to Nexus master at " << master;
link(master);
ftMsg->setMasterPid(master);
- send(master, pack<F2M_REREGISTER_FRAMEWORK>(fid, frameworkName, user, execInfo));
+
+ if (fid != "") // actual re-register to a new master leader
+ send(master, pack<F2M_REREGISTER_FRAMEWORK>(fid, frameworkName, user, execInfo));
+ else // not really a re-register, scheduler started before master
+ send(master, pack<F2M_REGISTER_FRAMEWORK>(frameworkName, user, execInfo));
+
break;
}
Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131613&r1=1131612&r2=1131613&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun 5 03:29:52 2011
@@ -65,14 +65,14 @@ public:
Slave::Slave(const string &_master, Resources _resources, bool _local)
: leaderDetector(NULL),
- resources(_resources), local(_local), id("-1"),
+ resources(_resources), local(_local), id(""),
isolationType("process"), isolationModule(NULL), slaveLeaderListener(this, getPID())
{
ftMsg = FTMessaging::getInstance();
pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(_master);
if (urlPair.first == UrlProcessor::ZOO) {
isFT = true;
- zkserver = urlPair.second;
+ zkServers = urlPair.second;
// } else if (urlPair.first == UrlProcessor::NEXUS) {
} else {
isFT = false;
@@ -89,14 +89,14 @@ Slave::Slave(const string &_master, Reso
Slave::Slave(const string &_master, Resources _resources, bool _local,
const string &_isolationType)
: leaderDetector(NULL),
- resources(_resources), local(_local), id("-1"),
+ resources(_resources), local(_local), id(""),
isolationType(_isolationType), isolationModule(NULL), slaveLeaderListener(this, getPID())
{
ftMsg = FTMessaging::getInstance();
pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(_master);
if (urlPair.first == UrlProcessor::ZOO) {
isFT = true;
- zkserver = urlPair.second;
+ zkServers = urlPair.second;
} else if (urlPair.first == UrlProcessor::NEXUS) {
isFT = false;
istringstream iss(urlPair.second);
@@ -144,8 +144,8 @@ void Slave::operator () ()
LOG(INFO) << "Slave started at " << self();
if (isFT) {
- LOG(INFO) << "Connecting to ZooKeeper at " << zkserver;
- leaderDetector = new LeaderDetector(zkserver, false, "", NULL);
+ LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
+ leaderDetector = new LeaderDetector(zkServers, false, "", NULL);
leaderDetector->setListener(&slaveLeaderListener); // use this instead of constructor to avoid race condition
string leaderPidStr = leaderDetector->getCurrentLeaderPID();
@@ -202,7 +202,9 @@ void Slave::operator () ()
FrameworkID tmpfid;
unpack<M2S_REGISTER_REPLY>(tmpfid);
LOG(INFO) << "RE-registered with master; given slave ID " << tmpfid << " had "<< this->id;
- link(spawn(new Heart(master, this->getPID(), this->id)));
+ if (this->id == "")
+ this->id = tmpfid;
+ link(spawn(new Heart(master, getPID(), this->id)));
break;
}
@@ -432,9 +434,11 @@ void Slave::operator () ()
taskVec.push_back(ti);
}
}
- //alibandali
- send(master, pack<S2M_REREGISTER_SLAVE>(id, hostname, publicDns, resources, taskVec));
+ if (id != "") // actual re-register
+ send(master, pack<S2M_REREGISTER_SLAVE>(id, hostname, publicDns, resources, taskVec));
+ else // slave started before master
+ send(master, pack<S2M_REGISTER_SLAVE>(hostname, publicDns, resources));
break;
}
Modified: incubator/mesos/trunk/src/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.hpp?rev=1131613&r1=1131612&r2=1131613&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave.hpp Sun Jun 5 03:29:52 2011
@@ -165,7 +165,7 @@ public:
typedef unordered_map<FrameworkID, Executor*> ExecutorMap;
bool isFT;
- string zkserver;
+ string zkServers;
LeaderDetector *leaderDetector;
PID master;
SlaveID id;
@@ -183,15 +183,13 @@ public:
class SlaveLeaderListener : public LeaderListener {
public:
- // TODO(alig): make thread safe
+ // Need to be thread safe. Currently does not use any shared variables.
SlaveLeaderListener(Slave *s, PID pp) : parent(s), parentPID(pp) {}
virtual void newLeaderElected(const string &zkId, const string &pidStr) {
if (zkId != "") {
LOG(INFO) << "Leader listener detected leader at " << pidStr <<" with ephemeral id:" << zkId;
- parent->zkserver = pidStr;
-
LOG(INFO) << "Sending message to parent " << parentPID << " about new leader";
parent->send(parentPID, parent->pack<LE_NEWLEADER>(pidStr));