You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 05:29:52 UTC

svn commit: r1131613 - in /incubator/mesos/trunk/src: master.cpp master.hpp nexus_sched.cpp slave.cpp slave.hpp

Author: benh
Date: Sun Jun  5 03:29:52 2011
New Revision: 1131613

URL: http://svn.apache.org/viewvc?rev=1131613&view=rev
Log:
Bugfixes. Slaves and frameworks can now be started before the master. Slave and frameworks start with empty id rather than -1. Capitalized some variable names correctly. Removed unnecessary assignment in the leader listener that could lead to race conditions.

Modified:
    incubator/mesos/trunk/src/master.cpp
    incubator/mesos/trunk/src/master.hpp
    incubator/mesos/trunk/src/nexus_sched.cpp
    incubator/mesos/trunk/src/slave.cpp
    incubator/mesos/trunk/src/slave.hpp

Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131613&r1=1131612&r2=1131613&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun  5 03:29:52 2011
@@ -119,7 +119,7 @@ Master::Master(const string &zk)
     pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(zk);
     if (urlPair.first == UrlProcessor::ZOO) {
       isFT = true;
-      zkservers = urlPair.second;
+      zkServers = urlPair.second;
     } else {
       LOG(ERROR) << "Failed to parse URL for ZooKeeper servers. URL must start with zoo:// or zoofile://";
       exit(1);
@@ -137,7 +137,7 @@ Master::Master(const string& _allocatorT
     pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(zk);
     if (urlPair.first == UrlProcessor::ZOO) {
       isFT = true;
-      zkservers = urlPair.second;
+      zkServers = urlPair.second;
     } else {
       LOG(ERROR) << "Failed to parse URL for ZooKeeper servers. URL must start with zoo:// or zoofile://";
       exit(1);
@@ -284,10 +284,10 @@ void Master::operator () ()
   LOG(INFO) << "Master started at nexus://" << self();
 
   if (isFT) {
-    LOG(INFO) << "Connecting to ZooKeeper at " << zkservers;
+    LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
     ostringstream lpid;
     lpid << self();
-    leaderDetector = new LeaderDetector(zkservers, true, lpid.str());
+    leaderDetector = new LeaderDetector(zkServers, true, lpid.str());
     
     string myLeaderSeq = leaderDetector->getMySeq();
     if (myLeaderSeq == "") {
@@ -334,6 +334,12 @@ void Master::operator () ()
                                        framework->name,
                                        framework->user,
                                        framework->executorInfo);
+
+      if (framework->id == "") {
+        DLOG(INFO) << "Framework reconnecting without a FrameworkID, generating new id";
+        framework->id = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextFrameworkId++);
+      }
+
       LOG(INFO) << "Registering " << framework << " at " << framework->pid;
       frameworks[framework->id] = framework;
       pidToFid[framework->pid] = framework->id;
@@ -480,6 +486,11 @@ void Master::operator () ()
       unpack<S2M_REREGISTER_SLAVE>(slave->id, slave->hostname, slave->publicDns,
       				   slave->resources, taskVec);
 
+      if (slave->id == "") {
+        slave->id = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextSlaveId++);
+        DLOG(WARNING) << "Slave re-registered without a SlaveID, generating a new id for it.";
+      }
+
       foreach(TaskInfo &ti, taskVec) {
         TaskInfo *tip = new TaskInfo(ti);
 	slave->addTask(tip);

Modified: incubator/mesos/trunk/src/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.hpp?rev=1131613&r1=1131612&r2=1131613&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.hpp (original)
+++ incubator/mesos/trunk/src/master.hpp Sun Jun  5 03:29:52 2011
@@ -262,7 +262,7 @@ class Master : public Tuple<Process>
 {
 protected:
   bool isFT;
-  string zkservers;
+  string zkServers;
   LeaderDetector *leaderDetector;
   unordered_map<FrameworkID, Framework *> frameworks;
   unordered_map<SlaveID, Slave *> slaves;

Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131613&r1=1131612&r2=1131613&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun  5 03:29:52 2011
@@ -74,7 +74,7 @@ private:
   string frameworkName;
   ExecutorInfo execInfo;
   bool isFT;
-  string zkservers;
+  string zkServers;
   LeaderDetector *leaderDetector;
   FTMessaging *ftMsg;
 
@@ -85,15 +85,13 @@ private:
 
   class SchedLeaderListener : public LeaderListener {
   public:
-    // TODO(alig): make thread safe
+    // Need to be thread safe. Currently does not use any shared variables. 
     SchedLeaderListener(SchedulerProcess *s, PID pp) : parent(s), parentPID(pp) {}
     
     virtual void newLeaderElected(const string &zkId, const string &pidStr) {
       if (zkId != "") {
 	LOG(INFO) << "Leader listener detected leader at " << pidStr <<" with ephemeral id:" << zkId;
 	
-	parent->zkservers = pidStr;
-
 	LOG(INFO) << "Sending message to parent " << parentPID << " about new leader";
 	parent->send(parentPID, parent->pack<LE_NEWLEADER>(pidStr));
 
@@ -134,7 +132,7 @@ public:
                    const ExecutorInfo& _execInfo)
     : driver(_driver),
       sched(_sched),
-      fid("-1"),
+      fid(""),
       terminate(false),
       frameworkName(_frameworkName),
       execInfo(_execInfo),
@@ -144,7 +142,7 @@ public:
   pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(_master);
   if (urlPair.first == UrlProcessor::ZOO) {
     isFT = true;
-    zkservers = urlPair.second;
+    zkServers = urlPair.second;
     //  } else if (urlPair.first == UrlProcessor::NEXUS) {
   } else {
     isFT = false; 
@@ -169,8 +167,8 @@ protected:
     string user(passwd->pw_name);
 
     if (isFT) {
-      LOG(INFO) << "Connecting to ZooKeeper at " << zkservers;
-      leaderDetector = new LeaderDetector(zkservers, false, "", NULL);
+      LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
+      leaderDetector = new LeaderDetector(zkServers, false, "", NULL);
       leaderDetector->setListener(&schedLeaderListener); // use this instead of constructor to avoid race condition
 
       string leaderPidStr = leaderDetector->getCurrentLeaderPID();
@@ -314,7 +312,12 @@ protected:
 	LOG(INFO) << "Connecting to Nexus master at " << master;
 	link(master);
         ftMsg->setMasterPid(master);
-	send(master, pack<F2M_REREGISTER_FRAMEWORK>(fid, frameworkName, user, execInfo));
+
+        if (fid != "")  // actual re-register to a new master leader
+          send(master, pack<F2M_REREGISTER_FRAMEWORK>(fid, frameworkName, user, execInfo));
+        else            // not really a re-register, scheduler started before master
+          send(master, pack<F2M_REGISTER_FRAMEWORK>(frameworkName, user, execInfo));
+
 	break;
       }
 

Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131613&r1=1131612&r2=1131613&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun  5 03:29:52 2011
@@ -65,14 +65,14 @@ public:
 
 Slave::Slave(const string &_master, Resources _resources, bool _local)
   : leaderDetector(NULL), 
-    resources(_resources), local(_local), id("-1"),
+    resources(_resources), local(_local), id(""),
     isolationType("process"), isolationModule(NULL), slaveLeaderListener(this, getPID())
 {
   ftMsg = FTMessaging::getInstance();
   pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(_master);
   if (urlPair.first == UrlProcessor::ZOO) {
     isFT = true;
-    zkserver = urlPair.second;
+    zkServers = urlPair.second;
     //  } else if (urlPair.first == UrlProcessor::NEXUS) {
   } else {
     isFT = false;
@@ -89,14 +89,14 @@ Slave::Slave(const string &_master, Reso
 Slave::Slave(const string &_master, Resources _resources, bool _local,
 	     const string &_isolationType)
   : leaderDetector(NULL), 
-    resources(_resources), local(_local), id("-1"),
+    resources(_resources), local(_local), id(""),
     isolationType(_isolationType), isolationModule(NULL), slaveLeaderListener(this, getPID())
 {
   ftMsg = FTMessaging::getInstance();
   pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(_master);
   if (urlPair.first == UrlProcessor::ZOO) {
     isFT = true;
-    zkserver = urlPair.second;
+    zkServers = urlPair.second;
   } else if (urlPair.first == UrlProcessor::NEXUS) {
     isFT = false;
     istringstream iss(urlPair.second);
@@ -144,8 +144,8 @@ void Slave::operator () ()
   LOG(INFO) << "Slave started at " << self();
 
   if (isFT) {
-    LOG(INFO) << "Connecting to ZooKeeper at " << zkserver;
-    leaderDetector = new LeaderDetector(zkserver, false, "", NULL);
+    LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
+    leaderDetector = new LeaderDetector(zkServers, false, "", NULL);
     leaderDetector->setListener(&slaveLeaderListener); // use this instead of constructor to avoid race condition
 
     string leaderPidStr = leaderDetector->getCurrentLeaderPID();
@@ -202,7 +202,9 @@ void Slave::operator () ()
         FrameworkID tmpfid;
         unpack<M2S_REGISTER_REPLY>(tmpfid);
         LOG(INFO) << "RE-registered with master; given slave ID " << tmpfid << " had "<< this->id;
-        link(spawn(new Heart(master, this->getPID(), this->id)));
+        if (this->id == "")
+          this->id = tmpfid;
+        link(spawn(new Heart(master, getPID(), this->id)));
         break;
       }
       
@@ -432,9 +434,11 @@ void Slave::operator () ()
 	    taskVec.push_back(ti);
 	  }
 	}
-	//alibandali
 
-	send(master, pack<S2M_REREGISTER_SLAVE>(id, hostname, publicDns, resources, taskVec));
+        if (id != "") // actual re-register
+          send(master, pack<S2M_REREGISTER_SLAVE>(id, hostname, publicDns, resources, taskVec));
+        else          // slave started before master
+          send(master, pack<S2M_REGISTER_SLAVE>(hostname, publicDns, resources));
 	
 	break;
       }

Modified: incubator/mesos/trunk/src/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.hpp?rev=1131613&r1=1131612&r2=1131613&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave.hpp Sun Jun  5 03:29:52 2011
@@ -165,7 +165,7 @@ public:
   typedef unordered_map<FrameworkID, Executor*> ExecutorMap;
   
   bool isFT;
-  string zkserver;
+  string zkServers;
   LeaderDetector *leaderDetector;
   PID master;
   SlaveID id;
@@ -183,15 +183,13 @@ public:
 
   class SlaveLeaderListener : public LeaderListener {
   public:
-    // TODO(alig): make thread safe
+    // Need to be thread safe. Currently does not use any shared variables. 
     SlaveLeaderListener(Slave *s, PID pp) : parent(s), parentPID(pp) {}
     
     virtual void newLeaderElected(const string &zkId, const string &pidStr) {
       if (zkId != "") {
 	LOG(INFO) << "Leader listener detected leader at " << pidStr <<" with ephemeral id:" << zkId;
 	
-	parent->zkserver = pidStr;
-
 	LOG(INFO) << "Sending message to parent " << parentPID << " about new leader";
 	parent->send(parentPID, parent->pack<LE_NEWLEADER>(pidStr));