You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 08:42:10 UTC

svn commit: r1131961 - in /incubator/mesos/trunk/src: master.cpp master.hpp messages.hpp slave.cpp

Author: benh
Date: Sun Jun  5 06:42:10 2011
New Revision: 1131961

URL: http://svn.apache.org/viewvc?rev=1131961&view=rev
Log:
Manual merge of a few minor changes made while running at Twitter.

Modified:
    incubator/mesos/trunk/src/master.cpp
    incubator/mesos/trunk/src/master.hpp
    incubator/mesos/trunk/src/messages.hpp
    incubator/mesos/trunk/src/slave.cpp

Modified: incubator/mesos/trunk/src/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.cpp?rev=1131961&r1=1131960&r2=1131961&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.cpp (original)
+++ incubator/mesos/trunk/src/master.cpp Sun Jun  5 06:42:10 2011
@@ -35,7 +35,7 @@ namespace {
 class AllocatorTimer : public Tuple<Process>
 {
 private:
-  PID master;
+  const PID master;
 
 protected:
   void operator () ()
@@ -271,18 +271,13 @@ void Master::operator () ()
   LOG(INFO) << "Master started at mesos://" << self();
 
   // Don't do anything until we get an identifier.
-  while (true) {
-    if (receive() == GOT_MASTER_ID) {
-      string id;
-      unpack<GOT_MASTER_ID>(id);
-      masterId = lexical_cast<int64_t>(id);
-      LOG(INFO) << "Master ID:" << masterId;
-      break;
-    } else {
-      LOG(INFO) << "Oops! We're dropping a message since "
-		<< "we haven't received an identifier yet!";
-    }
-  }
+  while (receive() != GOT_MASTER_ID)
+    LOG(INFO) << "Oops! We're dropping a message since "
+              << "we haven't received an identifier yet!";  
+  string id;
+  unpack<GOT_MASTER_ID>(id);
+  masterId = lexical_cast<long>(id);
+  LOG(INFO) << "Master ID:" << masterId;
 
   // Create the allocator (we do this after the constructor because it
   // leaks 'this').
@@ -378,7 +373,7 @@ void Master::operator () ()
         } else {
           // The slot offer is gone, meaning that we rescinded it or that
           // the slave was lost; immediately report any tasks in it as lost
-          foreach (TaskDescription &t, tasks) {
+          foreach (const TaskDescription &t, tasks) {
             send(framework->pid,
                  pack<M2F_STATUS_UPDATE>(t.taskId, TASK_LOST, ""));
           }
@@ -409,6 +404,9 @@ void Master::operator () ()
         if (task != NULL) {
           LOG(INFO) << "Asked to kill " << task << " by its framework";
           killTask(task);
+	} else {
+	  LOG(INFO) << "Asked to kill UNKNOWN task by its framework";
+	  send(framework->pid, pack<M2F_STATUS_UPDATE>(tid, TASK_LOST, ""));
         }
       }
       break;
@@ -432,9 +430,9 @@ void Master::operator () ()
 
     case S2M_REREGISTER_SLAVE: {
       Slave *slave = new Slave(from(), "", elapsed());
-      vector<Task> taskVec;
+      vector<Task> tasks;
       unpack<S2M_REREGISTER_SLAVE>(slave->id, slave->hostname, slave->publicDns,
-                                   slave->resources, taskVec);
+                                   slave->resources, tasks);
 
       if (slave->id == "") {
         slave->id = lexical_cast<string>(masterId) + "-"
@@ -443,10 +441,10 @@ void Master::operator () ()
                    << "generating a new id for it.";
       }
 
-      foreach(Task &ti, taskVec) {
-        Task *tip = new Task(ti);
-        slave->addTask(tip);
-        updateFrameworkTasks(tip);
+      foreach(const Task &t, tasks) {
+        Task *task = new Task(t);
+        slave->addTask(task);
+        updateFrameworkTasks(task);
       }
 
       // TODO(benh|alig): We should put a timeout on how long we keep
@@ -484,7 +482,11 @@ void Master::operator () ()
       VLOG(1) << "FT: prepare relay seq:"<< seq() << " from: "<< from();
       if (Slave *slave = lookupSlave(sid)) {
         if (Framework *framework = lookupFramework(fid)) {
-          // Pass on the status update to the framework.
+	  // Pass on the status update to the framework.
+	  // TODO(benh): Do we not want to forward the
+	  // S2M_FT_STATUS_UPDATE message? This seems a little tricky
+	  // because we really wanted to send the M2F_FT_STATUS_UPDATE
+	  // message.
           forward(framework->pid);
           if (duplicate()) {
             LOG(WARNING) << "FT: Locally ignoring duplicate message with id:" << seq();
@@ -636,6 +638,17 @@ void Master::operator () ()
       break;
     }
 
+    case M2M_FRAMEWORK_EXPIRED: {
+      FrameworkID fid;
+      unpack<M2M_FRAMEWORK_EXPIRED>(fid);
+      if (Framework *framework = lookupFramework(fid)) {
+	LOG(INFO) << "Framework failover timer expired, removing framework "
+		  << framework;
+	removeFramework(framework);
+      }
+      break;
+    }
+
     case PROCESS_EXIT: {
       // TODO(benh): Could we get PROCESS_EXIT from a network partition?
       LOG(INFO) << "Process exited: " << from();
@@ -643,7 +656,8 @@ void Master::operator () ()
         FrameworkID fid = pidToFid[from()];
         if (Framework *framework = lookupFramework(fid)) {
           LOG(INFO) << framework << " disconnected";
-          // TODO(benh): Wait for a framework failover.
+//  	  framework->failoverTimer = new FrameworkFailoverTimer(self(), fid);
+//  	  link(spawn(framework->failoverTimer));
           removeFramework(framework);
         }
       } else if (pidToSid.find(from()) != pidToSid.end()) {
@@ -652,6 +666,16 @@ void Master::operator () ()
           LOG(INFO) << slave << " disconnected";
           removeSlave(slave);
         }
+      } else {
+	foreachpair (_, Framework *framework, frameworks) {
+	  if (framework->failoverTimer != NULL &&
+	      framework->failoverTimer->getPID() == from()) {
+	    LOG(INFO) << "Lost framework failover timer, removing framework "
+		      << framework;
+	    removeFramework(framework);
+	    break;
+	  }
+	}
       }
       break;
     }

Modified: incubator/mesos/trunk/src/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master.hpp?rev=1131961&r1=1131960&r2=1131961&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master.hpp (original)
+++ incubator/mesos/trunk/src/master.hpp Sun Jun  5 06:42:10 2011
@@ -72,11 +72,43 @@ const double HEARTBEAT_INTERVAL = 2;
 // Acceptable time since we saw the last heartbeat (four heartbeats).
 const double HEARTBEAT_TIMEOUT = HEARTBEAT_INTERVAL * 4;
 
+// Time to wait for a framework to failover (TODO(benh): Make configurable)).
+const time_t FRAMEWORK_FAILOVER_TIMEOUT = 60;
+
 // Some forward declarations
 class Slave;
 class Allocator;
 
 
+class FrameworkFailoverTimer : public Tuple<Process>
+{
+private:
+  const PID master;
+  const FrameworkID fid;
+
+protected:
+  void operator () ()
+  {
+    link(master);
+    do {
+      switch (receive(FRAMEWORK_FAILOVER_TIMEOUT)) {
+      case PROCESS_TIMEOUT:
+	send(master, pack<M2M_FRAMEWORK_EXPIRED>(fid));
+	return;
+      case PROCESS_EXIT:
+	return;
+      case M2M_SHUTDOWN:
+	return;
+      }
+    } while (true);
+  }
+
+public:
+  FrameworkFailoverTimer(const PID &_master, FrameworkID _fid)
+    : master(_master), fid(_fid) {}
+};
+
+
 // Resources offered on a particular slave.
 struct SlaveResources
 {
@@ -102,7 +134,7 @@ struct SlotOffer
 
 // An connected framework.
 struct Framework
-{  
+{
   PID pid;
   FrameworkID id;
   bool active; // Turns false when framework is being removed
@@ -120,8 +152,22 @@ struct Framework
   // or 0 for slaves that we want to keep filtered forever
   unordered_map<Slave *, double> slaveFilter;
 
+  // A failover timer if the connection to this framework is lost.
+  FrameworkFailoverTimer *failoverTimer;
+
   Framework(const PID &_pid, FrameworkID _id, double time)
-    : pid(_pid), id(_id), active(true), connectTime(time) {}
+    : pid(_pid), id(_id), active(true), connectTime(time),
+      failoverTimer(NULL) {}
+
+  ~Framework()
+  {
+    if (failoverTimer != NULL) {
+      Process::post(failoverTimer->self(), M2M_SHUTDOWN);
+      Process::wait(failoverTimer->self());
+      delete failoverTimer;
+      failoverTimer = NULL;
+    }
+  }
   
   Task * lookupTask(TaskID tid)
   {
@@ -297,7 +343,7 @@ public:
   state::MasterState *getState();
   
   OfferID makeOffer(Framework *framework,
-                        const vector<SlaveResources>& resources);
+		    const vector<SlaveResources>& resources);
   
   void rescindOffer(SlotOffer *offer);
   

Modified: incubator/mesos/trunk/src/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messages.hpp?rev=1131961&r1=1131960&r2=1131961&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messages.hpp (original)
+++ incubator/mesos/trunk/src/messages.hpp Sun Jun  5 06:42:10 2011
@@ -92,17 +92,18 @@ enum MessageType {
 #endif /* __sun__ */
 
   /* Internal to master */
-  M2M_GET_STATE,        // Used by web UI
+  M2M_GET_STATE,         // Used by web UI
   M2M_GET_STATE_REPLY,
-  M2M_TIMER_TICK,       // Timer for expiring filters etc
-  M2M_SHUTDOWN,         // Used in tests to shut down master
+  M2M_TIMER_TICK,        // Timer for expiring filters etc
+  M2M_FRAMEWORK_EXPIRED, // Timer for expiring frameworks
+  M2M_SHUTDOWN,          // Used in tests to shut down master
 
   /* Internal to slave */
-  S2S_GOT_MASTER,       // Used when looking up master with ZooKeeper
-  S2S_GET_STATE,        // Used by web UI
+  S2S_GOT_MASTER,        // Used when looking up master with ZooKeeper
+  S2S_GET_STATE,         // Used by web UI
   S2S_GET_STATE_REPLY,
-  S2S_CHILD_EXIT,       // Sent by reaper process
-  S2S_SHUTDOWN,         // Used in tests to shut down slave
+  S2S_CHILD_EXIT,        // Sent by reaper process
+  S2S_SHUTDOWN,          // Used in tests to shut down slave
 
   MESOS_MESSAGES,
 };
@@ -332,7 +333,10 @@ TUPLE(M2M_GET_STATE_REPLY,
 
 TUPLE(M2M_TIMER_TICK,
       ());
-       
+
+TUPLE(M2M_FRAMEWORK_EXPIRED,
+      (FrameworkID));
+
 TUPLE(M2M_SHUTDOWN,
       ());
 

Modified: incubator/mesos/trunk/src/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave.cpp?rev=1131961&r1=1131960&r2=1131961&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave.cpp Sun Jun  5 06:42:10 2011
@@ -511,8 +511,17 @@ string Slave::getWorkDirectory(Framework
   } else {
     workDir = "work";
   }
+
+  // TODO(benh): Clean this up, check for errors, etc.
+  time_t rawtime;
+  struct tm* timeinfo;
+  time(&rawtime);
+  timeinfo = localtime(&rawtime);
+  char timestr[32];
+  strftime(timestr, sizeof(timestr), "%Y-%m-%d-%H:%M", timeinfo);
+
   ostringstream fwDir;
-  fwDir << workDir << "/slave-" << id << "/fw-" << fid;
+  fwDir << workDir << "/slave-" << id << "/fw-" << fid << "/" << timestr;
   return fwDir.str();
 }