You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 07:24:41 UTC

svn commit: r1131744 - in /incubator/mesos/trunk/src: master.cpp messages.hpp nexus_sched.cpp slave.cpp

Author: benh
Date: Sun Jun  5 05:24:41 2011
New Revision: 1131744

URL: http://svn.apache.org/viewvc?rev=1131744&view=rev
Log:
Getting code deployable with and without ZooKeeper.

Modified:
    incubator/mesos/trunk/src/master.cpp
    incubator/mesos/trunk/src/messages.hpp
    incubator/mesos/trunk/src/nexus_sched.cpp
    incubator/mesos/trunk/src/slave.cpp

Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131744&r1=1131743&r2=1131744&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun  5 05:24:41 2011
@@ -148,18 +148,25 @@ Master::Master(const string& _allocatorT
 
 Master::~Master()
 {
-  if (isFT && masterDetector != NULL)
-    delete masterDetector;
   LOG(INFO) << "Shutting down master";
+
+  if (masterDetector != NULL) {
+    delete masterDetector;
+    masterDetector = NULL;
+  }
+
   delete allocator;
+
   foreachpair (_, Framework *framework, frameworks) {
     foreachpair(_, TaskInfo *task, framework->tasks)
       delete task;
     delete framework;
   }
+
   foreachpair (_, Slave *slave, slaves) {
     delete slave;
   }
+
   foreachpair (_, SlotOffer *offer, slotOffers) {
     delete offer;
   }
@@ -281,9 +288,27 @@ void Master::operator () ()
 {
   LOG(INFO) << "Master started at nexus://" << self();
 
-  LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
-  masterDetector = new MasterDetector(zkServers, ZNODE, self(), true);
-  
+  if (isFT) {
+    LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
+    masterDetector = new MasterDetector(zkServers, ZNODE, self(), true);
+  } else {
+    send(self(), pack<GOT_MASTER_SEQ>("0"));
+  }
+
+  // Don't do anything until we get a sequence identifier.
+  bool waitingForSeq = true;
+  do {
+    switch (receive()) {
+      case GOT_MASTER_SEQ: {
+	string mySeq;
+	unpack<GOT_MASTER_SEQ>(mySeq);
+	masterId = lexical_cast<long>(mySeq);
+	LOG(INFO) << "Master ID:" << masterId;
+	break;
+      }
+    }
+  } while (waitingForSeq);
+
   allocator = createAllocator();
   if (!allocator)
     LOG(FATAL) << "Unrecognized allocator type: " << allocatorType;
@@ -294,15 +319,6 @@ void Master::operator () ()
   while (true) {
     switch (receive()) {
 
-    case GOT_MASTER_SEQ: {
-      // TODO(benh|alig): NEED TO GET SEQ BEFORE ANYONE ELSE CONNECTS!
-      string mySeq;
-      unpack<GOT_MASTER_SEQ>(mySeq);
-      masterId = lexical_cast<long>(mySeq);
-      LOG(INFO) << "Master ID:" << masterId;
-      break;
-    }
-
     case F2M_REGISTER_FRAMEWORK: {
       FrameworkID fid = lexical_cast<string>(masterId) + "-" + lexical_cast<string>(nextFrameworkId++);
 

Modified: incubator/mesos/trunk/src/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.hpp?rev=1131744&r1=1131743&r2=1131744&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.hpp (original)
+++ incubator/mesos/trunk/src/messages.hpp Sun Jun  5 05:24:41 2011
@@ -15,6 +15,7 @@
 #include "foreach.hpp"
 #include "task_info.hpp"
 
+
 namespace nexus { namespace internal {
 
 enum MessageType {

Modified: incubator/mesos/trunk/src/nexus_sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/nexus_sched.cpp?rev=1131744&r1=1131743&r2=1131744&view=diff
==============================================================================
--- incubator/mesos/trunk/src/nexus_sched.cpp (original)
+++ incubator/mesos/trunk/src/nexus_sched.cpp Sun Jun  5 05:24:41 2011
@@ -76,9 +76,8 @@ private:
   MasterDetector *masterDetector;
   FTMessaging *ftMsg;
 
-  typedef unordered_map< SlaveID, PID > SidToPidMap;
-  unordered_map< OfferID, SidToPidMap > savedOffers;
-  SidToPidMap sidToPidMap;
+  unordered_map<OfferID, unordered_map<SlaveID, PID> > savedOffers;
+  unordered_map<SlaveID, PID> savedSlavePids;
 
   volatile bool terminate;
 
@@ -114,7 +113,8 @@ public:
       terminate(false),
       frameworkName(_frameworkName),
       execInfo(_execInfo),
-      masterDetector(NULL)
+      masterDetector(NULL),
+      ftMsg(FTMessaging::getInstance())
 {
   pair<UrlProcessor::URLType, string> urlPair = UrlProcessor::process(_master);
   if (urlPair.first == UrlProcessor::ZOO) {
@@ -129,9 +129,15 @@ public:
       cerr << "Failed to parse URL for master: " << _master <<endl;
       exit(1);
     }
-  } 
+  }
+}
 
-  ftMsg = FTMessaging::getInstance();
+~SchedulerProcess()
+{
+  if (masterDetector != NULL) {
+    delete masterDetector;
+    masterDetector = NULL;
+  }
 }
 
 protected:
@@ -143,8 +149,12 @@ protected:
       fatal("failed to get username information");
     string user(passwd->pw_name);
 
-    LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
-    masterDetector = new MasterDetector(zkServers, ZNODE, self(), false);
+    if (isFT) {
+      LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
+      masterDetector = new MasterDetector(zkServers, ZNODE, self(), false);
+    } else {
+      send(self(), pack<NEW_MASTER_DETECTED>(0, master));
+    }
 
     while(true) {
       // Rather than send a message to this process when it is time to
@@ -165,7 +175,8 @@ protected:
       switch(receive(FT_TIMEOUT)) {
       // TODO(benh): We need to break the receive loop every so often
       // to check if 'terminate' has been set .. but rather than use a
-      // timeout in receive, maybe we should send a message.
+      // timeout in receive, it would be nice to send a message, but
+      // see above.
 
       case NEW_MASTER_DETECTED: {
 	string masterSeq;
@@ -206,9 +217,10 @@ protected:
         vector<SlaveOffer> offs;
         unpack<M2F_SLOT_OFFER>(oid, offs);
         
-        SidToPidMap &tmpMap = savedOffers[ oid ];
+	// Save all the slave PIDs found in the offer so later we can
+	// send framework messages directly.
         foreach(const SlaveOffer &offer, offs) {
-          tmpMap[ offer.slaveId ] = offer.slavePid;
+	  savedOffers[oid][offer.slaveId] = offer.slavePid;
         }
 
         invoke(bind(&Scheduler::resourceOffer, sched, driver, oid, ref(offs)));
@@ -221,27 +233,29 @@ protected:
         Params params;
         unpack<F2F_SLOT_OFFER_REPLY>(oid, tasks, params);
 
-	// Save only the PIDs we need for sending framework messages.
+	// Keep only the slave PIDs where we run tasks so we can send
+	// framework messages directly.
         foreach(const TaskDescription &task, tasks) {
-          sidToPidMap[ task.slaveId ] = savedOffers[ oid ][ task.slaveId ];
-
+          savedSlavePids[task.slaveId] = savedOffers[oid][task.slaveId];
         }
-        savedOffers.erase( oid );
 
-	// TODO(alig|benh): Use new API -> rsend(master, pack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params));
-	// TODO(alig): Improve the following comment.
-	// Do a reliable send here because ...
+	// Remove the offer since we saved all the PIDs we might use.
+        savedOffers.erase(oid);
+
+	// TODO(alig|benh): Walk through scenario if the master dies
+	// after it sends out M2S_RUN_TASK messages?
+
         if (isFT) {
-          TimeoutListener *tListener = 
-            new TimeoutListener(this, tasks);
+          TimeoutListener *tListener = new TimeoutListener(this, tasks);
 
           string ftId = ftMsg->getNextId();
           DLOG(INFO) << "Sending reliably reply to slot offer for msg " << ftId;
-          ftMsg->reliableSend( ftId,
-                               pack<F2M_FT_SLOT_OFFER_REPLY>(ftId, self(), fid, oid, tasks, params),
-                               tListener);
-        } else
+          ftMsg->reliableSend(ftId,
+			      pack<F2M_FT_SLOT_OFFER_REPLY>(ftId, self(), fid, oid, tasks, params),
+			      tListener);
+        } else {
           send(master, pack<F2M_SLOT_OFFER_REPLY>(fid, oid, tasks, params));
+	}
 	
         break;
       }
@@ -249,15 +263,12 @@ protected:
       case F2F_FRAMEWORK_MESSAGE: {
         FrameworkMessage msg;
         unpack<F2F_FRAMEWORK_MESSAGE>(msg);
-
-        /*
-        if (isFT) {
-          string ftId = ftMsg->getNextId();
-          ftMsg->reliableSend( ftId, pack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), fid, msg));
-        } else
-          send(master, pack<F2M_FRAMEWORK_MESSAGE>(fid, msg));
-        */
-        send( sidToPidMap[ msg.slaveId ], pack<M2S_FRAMEWORK_MESSAGE>(fid, msg));
+//         if (isFT) {
+//           string ftId = ftMsg->getNextId();
+//           ftMsg->reliableSend( ftId, pack<F2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), fid, msg));
+//         } else
+//           send(master, pack<F2M_FRAMEWORK_MESSAGE>(fid, msg));
+        send(savedSlavePids[msg.slaveId], pack<M2S_FRAMEWORK_MESSAGE>(fid, msg));
         break;
       }
 
@@ -284,7 +295,6 @@ protected:
         break;
       }
 
-
       case M2F_STATUS_UPDATE: {
         TaskID tid;
         TaskState state;
@@ -319,7 +329,7 @@ protected:
       case M2F_LOST_SLAVE: {
         SlaveID sid;
         unpack<M2F_LOST_SLAVE>(sid);
-        sidToPidMap.erase(sid);
+	savedSlavePids.erase(sid);
         invoke(bind(&Scheduler::slaveLost, sched, driver, sid));
         break;
       }
@@ -333,14 +343,12 @@ protected:
       }
 
       case PROCESS_EXIT: {
-        const char* message = "Connection to master failed";
-	 if (isFT) 
-	   LOG(WARNING) << "Connection to master failed. Waiting for a new master to be elected.";
-	 else 
-	   {
-	      LOG(ERROR) << "Connection to master failed. Exiting. Consider running Nexus in FT mode!";
-	      invoke(bind(&Scheduler::error, sched, driver, -1, message));
-	   }
+	if (isFT) {
+	  LOG(WARNING) << "Connection to master lost .. waiting for new master.";
+	} else {
+	  const char* message = "Connection to master failed";
+	  invoke(bind(&Scheduler::error, sched, driver, -1, message));
+	}
         break;
       }
 

Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131744&r1=1131743&r2=1131744&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun  5 05:24:41 2011
@@ -107,7 +107,10 @@ Slave::Slave(const string &_master, Reso
 
 Slave::~Slave()
 {
-  delete masterDetector;
+  if (masterDetector != NULL) {
+    delete masterDetector;
+    masterDetector = NULL;
+  }
 }
 
 
@@ -141,8 +144,12 @@ void Slave::operator () ()
 {
   LOG(INFO) << "Slave started at " << self();
 
-  LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
-  masterDetector = new MasterDetector(zkServers, ZNODE, self(), false);
+  if (isFT) {
+    LOG(INFO) << "Connecting to ZooKeeper at " << zkServers;
+    masterDetector = new MasterDetector(zkServers, ZNODE, self(), false);
+  } else {
+    send(self(), pack<NEW_MASTER_DETECTED>(0, master));
+  }
 
   // Get our hostname
   char buf[256];
@@ -365,13 +372,11 @@ void Slave::operator () ()
         unpack<E2S_FRAMEWORK_MESSAGE>(fid, message);
         // Set slave ID in case framework omitted it
         message.slaveId = this->id;
-        /*
-        if (isFT) {
-          string ftId = ftMsg->getNextId();
-          ftMsg->reliableSend(ftId, pack<S2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), id, fid, message));
-        } else
-          send(master, pack<S2M_FRAMEWORK_MESSAGE>(id, fid, message));
-        */
+//         if (isFT) {
+//           string ftId = ftMsg->getNextId();
+//           ftMsg->reliableSend(ftId, pack<S2M_FT_FRAMEWORK_MESSAGE>(ftId, self(), id, fid, message));
+//         } else
+//           send(master, pack<S2M_FRAMEWORK_MESSAGE>(id, fid, message));
         send(getFramework(fid)->fwPid, pack<M2F_FRAMEWORK_MESSAGE>(message));
         break;
       }
@@ -386,16 +391,15 @@ void Slave::operator () ()
 
         if (from() == master) {
 	  // TODO: Fault tolerance!
-	   if (isFT)
-	     LOG(WARNING) << "FT: Master disconnected! Waiting for a new master to be elected."; 
-	   else 
-	     {
-		LOG(ERROR) << "Master disconnected! Exiting. Consider running Nexus in FT mode!";
-		if (isolationModule != NULL)
-		  delete isolationModule;
-		// TODO: Shut down executors?
-		return;
-	     }
+	  if (isFT) {
+	    LOG(WARNING) << "FT: Master disconnected! Waiting for a new master to be elected."; 
+	  } else {
+	    LOG(ERROR) << "Master disconnected! Exiting. Consider running Nexus in FT mode!";
+	    if (isolationModule != NULL)
+	      delete isolationModule;
+	    // TODO: Shut down executors?
+	    return;
+	  }
 	}
 
         foreachpair (_, Executor *ex, executors) {