You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2011/06/05 11:18:22 UTC

svn commit: r1132277 - in /incubator/mesos/trunk/src: detector/ exec/ master/ messaging/ sched/ slave/ tests/

Author: benh
Date: Sun Jun  5 09:18:22 2011
New Revision: 1132277

URL: http://svn.apache.org/viewvc?rev=1132277&view=rev
Log:
Finished changes to accomodate latest libprocess updates, as well as eliminated switch style receive loops in Slave and ExecutorProcess.

Modified:
    incubator/mesos/trunk/src/detector/detector.cpp
    incubator/mesos/trunk/src/exec/exec.cpp
    incubator/mesos/trunk/src/master/master.cpp
    incubator/mesos/trunk/src/master/master.hpp
    incubator/mesos/trunk/src/messaging/messages.cpp
    incubator/mesos/trunk/src/messaging/messages.hpp
    incubator/mesos/trunk/src/sched/sched.cpp
    incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/slave/slave.hpp
    incubator/mesos/trunk/src/tests/master_test.cpp

Modified: incubator/mesos/trunk/src/detector/detector.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/detector/detector.cpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/detector/detector.cpp (original)
+++ incubator/mesos/trunk/src/detector/detector.cpp Sun Jun  5 09:18:22 2011
@@ -46,7 +46,7 @@ protected:
     if (name() == process::TIMEOUT) {
       LOG(ERROR) << "Have not heard back from ZooKeeper after trying to "
                  << "(automagically) reconnect";
-      MesosProcess<class T>::post(pid, MASTER_DETECTION_FAILURE);
+      process::post(pid, MASTER_DETECTION_FAILURE);
     }
   }
 
@@ -475,7 +475,7 @@ void ZooKeeperMasterDetector::detectMast
 
   // No master present (lost or possibly hasn't come up yet).
   if (masterSeq.empty()) {
-    MesosProcess<class T>::post(pid, NO_MASTER_DETECTED);
+    process::post(pid, NO_MASTER_DETECTED);
   } else if (masterSeq != currentMasterSeq) {
     currentMasterSeq = masterSeq;
     currentMasterPID = lookupMasterPID(masterSeq); 
@@ -483,7 +483,7 @@ void ZooKeeperMasterDetector::detectMast
     // While trying to get the master PID, master might have crashed,
     // so PID might be empty.
     if (currentMasterPID == UPID()) {
-      MesosProcess<class T>::post(pid, NO_MASTER_DETECTED);
+      process::post(pid, NO_MASTER_DETECTED);
     } else {
       MSG<NEW_MASTER_DETECTED> msg;
       msg.set_pid(currentMasterPID);

Modified: incubator/mesos/trunk/src/exec/exec.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/exec/exec.cpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/exec/exec.cpp (original)
+++ incubator/mesos/trunk/src/exec/exec.cpp Sun Jun  5 09:18:22 2011
@@ -41,9 +41,26 @@ public:
                   const ExecutorID& _executorId, bool _local)
     : slave(_slave), driver(_driver), executor(_executor),
       frameworkId(_frameworkId), executorId(_executorId),
-      local(_local), terminate(false) {}
+      local(_local), terminate(false)
+  {
+    install(S2E_REGISTER_REPLY, &ExecutorProcess::registerReply,
+            &ExecutorRegisteredMessage::args);
+
+    install(S2E_RUN_TASK, &ExecutorProcess::runTask,
+            &RunTaskMessage::task);
+
+    install(S2E_KILL_TASK, &ExecutorProcess::killTask,
+            &KillTaskMessage::task_id);
+
+    install(S2E_FRAMEWORK_MESSAGE, &ExecutorProcess::frameworkMessage,
+            &FrameworkMessageMessage::message);
+
+    install(S2E_KILL_EXECUTOR, &ExecutorProcess::killExecutor);
 
-  ~ExecutorProcess() {}
+    install(process::EXITED, &ExecutorProcess::exited);
+  }
+
+  virtual ~ExecutorProcess() {}
 
 protected:
   virtual void operator () ()
@@ -59,108 +76,76 @@ protected:
     send(slave, out);
 
     while(true) {
-      // TODO(benh): Is there a better way to architect this code? In
-      // particular, if the executor blocks in a callback, we can't
-      // process any other messages. This is especially tricky if a
-      // slave dies since we won't handle the PROCESS_EXIT message in
-      // a timely manner (if at all).
-
       // Check for terminate in the same way as SchedulerProcess. See
       // comments there for an explanation of why this is necessary.
-      if (terminate)
-        return;
+      if (terminate) return;
 
-      switch (receive(2)) {
-        case S2E_REGISTER_REPLY: {
-          const MSG<S2E_REGISTER_REPLY>& msg = message();
-
-          slaveId = msg.args().slave_id();
-
-          VLOG(1) << "Executor registered on slave " << slaveId;
-
-          process::invoke(bind(&Executor::init, executor, driver,
-                               cref(msg.args())));
-          break;
-        }
-
-        case S2E_RUN_TASK: {
-          const MSG<S2E_RUN_TASK>& msg = message();
-
-          const TaskDescription& task = msg.task();
-
-          VLOG(1) << "Executor asked to run a task " << task.task_id();
-
-          MSG<E2S_STATUS_UPDATE> out;
-          out.mutable_framework_id()->MergeFrom(frameworkId);
-          TaskStatus* status = out.mutable_status();
-          status->mutable_task_id()->MergeFrom(task.task_id());
-          status->mutable_slave_id()->MergeFrom(slaveId);
-          status->set_state(TASK_RUNNING);
-          send(slave, out);
-
-          process::invoke(bind(&Executor::launchTask, executor, driver,
-                               cref(task)));
-          break;
-        }
-
-        case S2E_KILL_TASK: {
-          const MSG<S2E_KILL_TASK>& msg = message();
-
-          VLOG(1) << "Executor asked to kill task " << msg.task_id();
-
-          process::invoke(bind(&Executor::killTask, executor, driver,
-                               cref(msg.task_id())));
-          break;
-        }
-
-        case S2E_FRAMEWORK_MESSAGE: {
-          const MSG<S2E_FRAMEWORK_MESSAGE>& msg = message();
-
-          VLOG(1) << "Executor passed message";
-
-          const FrameworkMessage& message = msg.message();
-          process::invoke(bind(&Executor::frameworkMessage, executor, driver,
-                               cref(message)));
-          break;
-        }
-
-        case S2E_KILL_EXECUTOR: {
-          VLOG(1) << "Executor asked to shutdown";
-          process::invoke(bind(&Executor::shutdown, executor, driver));
-          if (!local)
-            exit(0);
-          else
-            return;
-        }
-
-        case PROCESS_EXIT: {
-          VLOG(1) << "Slave exited, trying to shutdown";
-
-          // TODO: Pass an argument to shutdown to tell it this is abnormal?
-          process::invoke(bind(&Executor::shutdown, executor, driver));
-
-          // This is a pretty bad state ... no slave is left. Rather
-          // than exit lets kill our process group (which includes
-          // ourself) hoping to clean up any processes this executor
-          // launched itself.
-          // TODO(benh): Maybe do a SIGTERM and then later do a SIGKILL?
-          if (!local)
-            killpg(0, SIGKILL);
-          else
-            return;
-        }
-
-        case PROCESS_TIMEOUT: {
-          break;
-        }
-
-        default: {
-          VLOG(1) << "Received unknown message ID " << msgid()
-                  << " from " << from();
-          // TODO: Is this serious enough to exit?
-          break;
-        }
-      }
+      serve(0, true);
+    }
+  }
+
+  void registerReply(const ExecutorArgs& args)
+  {
+    VLOG(1) << "Executor registered on slave " << args.slave_id();
+    slaveId = args.slave_id();
+    process::invoke(bind(&Executor::init, executor, driver, cref(args)));
+  }
+
+  void runTask(const TaskDescription& task)
+  {
+    VLOG(1) << "Executor asked to run a task " << task.task_id();
+
+    MSG<E2S_STATUS_UPDATE> out;
+    out.mutable_framework_id()->MergeFrom(frameworkId);
+    TaskStatus* status = out.mutable_status();
+    status->mutable_task_id()->MergeFrom(task.task_id());
+    status->mutable_slave_id()->MergeFrom(slaveId);
+    status->set_state(TASK_RUNNING);
+    send(slave, out);
+
+    process::invoke(bind(&Executor::launchTask, executor, driver, cref(task)));
+  }
+
+  void killTask(const TaskID& taskId)
+  {
+    VLOG(1) << "Executor asked to kill task " << taskId;
+    process::invoke(bind(&Executor::killTask, executor, driver, cref(taskId)));
+  }
+
+  void frameworkMessage(const FrameworkMessage& message)
+  {
+    VLOG(1) << "Executor received message";
+    process::invoke(bind(&Executor::frameworkMessage, executor, driver,
+                         cref(message)));
+  }
+
+  void killExecutor()
+  {
+    VLOG(1) << "Executor asked to shutdown";
+    process::invoke(bind(&Executor::shutdown, executor, driver));
+    if (!local) {
+      exit(0);
+    } else {
+      return;
+    }
+  }
+
+  void exited()
+  {
+    VLOG(1) << "Slave exited, trying to shutdown";
+
+    // TODO: Pass an argument to shutdown to tell it this is abnormal?
+    process::invoke(bind(&Executor::shutdown, executor, driver));
+
+    // This is a pretty bad state ... no slave is left. Rather
+    // than exit lets kill our process group (which includes
+    // ourself) hoping to clean up any processes this executor
+    // launched itself.
+    // TODO(benh): Maybe do a SIGTERM and then later do a SIGKILL?
+    if (!local) {
+      killpg(0, SIGKILL);
+    } else {
+      terminate = true;
     }
   }
 
@@ -206,6 +191,8 @@ MesosExecutorDriver::MesosExecutorDriver
 
 MesosExecutorDriver::~MesosExecutorDriver()
 {
+  // Just as in SchedulerProcess, we might wait here indefinitely if
+  // MesosExecutorDriver::stop has not been invoked.
   process::wait(process->self());
   delete process;
 
@@ -295,7 +282,10 @@ int MesosExecutorDriver::stop()
     return -1;
   }
 
+  CHECK(process != NULL);
+
   process->terminate = true;
+  process::post(process->self(), process::TERMINATE);
 
   running = false;
 
@@ -308,8 +298,10 @@ int MesosExecutorDriver::stop()
 int MesosExecutorDriver::join()
 {
   Lock lock(&mutex);
-  while (running)
+
+  while (running) {
     pthread_cond_wait(&cond, &mutex);
+  }
 
   return 0;
 }
@@ -331,6 +323,8 @@ int MesosExecutorDriver::sendStatusUpdat
     return -1;
   }
 
+  CHECK(process != NULL);
+
   // Validate that they set the correct slave ID.
   if (!(process->slaveId == status.slave_id())) {
     return -1;
@@ -355,6 +349,8 @@ int MesosExecutorDriver::sendFrameworkMe
     return -1;
   }
 
+  CHECK(process != NULL);
+
   // Validate that they set the correct slave ID and executor ID.
   if (!(process->slaveId == message.slave_id())) {
     return -1;

Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Sun Jun  5 09:18:22 2011
@@ -55,7 +55,7 @@ protected:
       receive(1);
       if (name() == process::TIMEOUT) {
         dispatch(master, &Master::timerTick);
-      } else if (name() == process::EXIT) {
+      } else if (name() == process::EXITED) {
 	return;
       }
     }
@@ -323,7 +323,8 @@ void Master::operator () ()
               << "we haven't received an identifier yet!";  
   }
 
-  const MSG<GOT_MASTER_TOKEN>& msg = message();
+  MSG<GOT_MASTER_TOKEN> msg;
+  msg.ParseFromString(body());
 
   // The master ID is comprised of the current date and some ephemeral
   // token (e.g., determined by ZooKeeper).
@@ -342,15 +343,15 @@ void Master::operator () ()
 
   while (true) {
     serve();
-    if (msgid() == PROCESS_TERMINATE) {
+    if (name() == process::TERMINATE) {
       LOG(INFO) << "Asked to terminate by " << from();
       foreachpair (_, Slave* slave, slaves) {
         send(slave->pid, process::TERMINATE);
       }
       break;
     } else {
-      LOG(INFO) << "Unhandled message " << name()
-                << " from " << from();
+      LOG(WARNING) << "Dropping unknown message '" << name() << "'"
+                   << " from: " << from();
     }
   }
 }
@@ -427,7 +428,7 @@ void Master::initialize()
   install(SH2M_HEARTBEAT, &Master::slaveHeartbeat,
           &HeartbeatMessage::slave_id);
 
-  install(PROCESS_EXIT, &Master::processExited);
+  install(process::EXITED, &Master::exited);
 
   // Install HTTP request handlers.
   Process<Master>::install("vars", &Master::vars);
@@ -983,7 +984,7 @@ void Master::frameworkExpired(const Fram
 }
 
 
-void Master::processExited()
+void Master::exited()
 {
   // TODO(benh): Could we get PROCESS_EXIT from a network partition?
   LOG(INFO) << "Process exited: " << from();

Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Sun Jun  5 09:18:22 2011
@@ -162,7 +162,7 @@ public:
   void slaveHeartbeat(const SlaveID& slaveId);
   void timerTick();
   void frameworkExpired(const FrameworkID& frameworkId);
-  void processExited();
+  void exited();
 
   process::Promise<process::HttpResponse> vars(const process::HttpRequest& request);
 
@@ -263,7 +263,7 @@ protected:
       if (name() == process::TIMEOUT) {
         process::dispatch(master, &Master::frameworkExpired, frameworkId);
         return;
-      } else if (name() == process::EXIT || name() == process::TERMINATE) {
+      } else if (name() == process::EXITED || name() == process::TERMINATE) {
         return;
       }
     }

Modified: incubator/mesos/trunk/src/messaging/messages.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.cpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.cpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.cpp Sun Jun  5 09:18:22 2011
@@ -1,99 +1,76 @@
 #include "messaging/messages.hpp"
 
+#define DEFINE_MESSAGE(name)                    \
+  char name[] = #name
 
 namespace mesos { namespace internal {
 
-boost::unordered_map<std::string, MSGID> ids;
-boost::unordered_map<MSGID, std::string> names;
-
-
-static struct Initialization
-{
-  Initialization()
-  {
-    ids[process::EXIT] = PROCESS_EXIT;
-    names[PROCESS_EXIT] = process::EXIT;
-
-    ids[process::TIMEOUT] = PROCESS_TIMEOUT;
-    names[PROCESS_TIMEOUT] = process::TIMEOUT;
-
-    ids[process::TERMINATE] = PROCESS_TERMINATE;
-    names[PROCESS_TERMINATE] = process::TERMINATE;
-  }
-} __initialization__;
-
-
-struct InitializeMessage
-{
-  InitializeMessage(const std::string& name, MSGID id)
-  {
-    ids[name] = id;
-    names[id] = name;
-  }
-};
-
-
-#define INITIALIZE_MESSAGE(ID)               \
-    static InitializeMessage __ ## ID(#ID, ID)
-
-
-INITIALIZE_MESSAGE(F2M_REGISTER_FRAMEWORK);
-INITIALIZE_MESSAGE(F2M_REREGISTER_FRAMEWORK);
-INITIALIZE_MESSAGE(F2M_UNREGISTER_FRAMEWORK);
-INITIALIZE_MESSAGE(F2M_RESOURCE_OFFER_REPLY);
-INITIALIZE_MESSAGE(F2M_REVIVE_OFFERS);
-INITIALIZE_MESSAGE(F2M_KILL_TASK);
-INITIALIZE_MESSAGE(F2M_FRAMEWORK_MESSAGE);
-INITIALIZE_MESSAGE(F2M_STATUS_UPDATE_ACK);
-
-INITIALIZE_MESSAGE(M2F_REGISTER_REPLY);
-INITIALIZE_MESSAGE(M2F_RESOURCE_OFFER);
-INITIALIZE_MESSAGE(M2F_RESCIND_OFFER);
-INITIALIZE_MESSAGE(M2F_STATUS_UPDATE);
-INITIALIZE_MESSAGE(M2F_LOST_SLAVE);
-INITIALIZE_MESSAGE(M2F_FRAMEWORK_MESSAGE);
-INITIALIZE_MESSAGE(M2F_ERROR);
-
-INITIALIZE_MESSAGE(S2M_REGISTER_SLAVE);
-INITIALIZE_MESSAGE(S2M_REREGISTER_SLAVE);
-INITIALIZE_MESSAGE(S2M_UNREGISTER_SLAVE);
-INITIALIZE_MESSAGE(S2M_STATUS_UPDATE);
-INITIALIZE_MESSAGE(S2M_FRAMEWORK_MESSAGE);
-INITIALIZE_MESSAGE(S2M_EXITED_EXECUTOR);
-
-INITIALIZE_MESSAGE(SH2M_HEARTBEAT);
-  
-INITIALIZE_MESSAGE(M2S_REGISTER_REPLY);
-INITIALIZE_MESSAGE(M2S_REREGISTER_REPLY);
-INITIALIZE_MESSAGE(M2S_RUN_TASK);
-INITIALIZE_MESSAGE(M2S_KILL_TASK);
-INITIALIZE_MESSAGE(M2S_KILL_FRAMEWORK);
-INITIALIZE_MESSAGE(M2S_FRAMEWORK_MESSAGE);
-INITIALIZE_MESSAGE(M2S_UPDATE_FRAMEWORK);
-INITIALIZE_MESSAGE(M2S_STATUS_UPDATE_ACK);
-
-INITIALIZE_MESSAGE(E2S_REGISTER_EXECUTOR);
-INITIALIZE_MESSAGE(E2S_STATUS_UPDATE);
-INITIALIZE_MESSAGE(E2S_FRAMEWORK_MESSAGE);
-
-INITIALIZE_MESSAGE(S2E_REGISTER_REPLY);
-INITIALIZE_MESSAGE(S2E_RUN_TASK);
-INITIALIZE_MESSAGE(S2E_KILL_TASK);
-INITIALIZE_MESSAGE(S2E_FRAMEWORK_MESSAGE);
-INITIALIZE_MESSAGE(S2E_KILL_EXECUTOR);
+// From framework to master.
+DEFINE_MESSAGE(F2M_REGISTER_FRAMEWORK);
+DEFINE_MESSAGE(F2M_REREGISTER_FRAMEWORK);
+DEFINE_MESSAGE(F2M_UNREGISTER_FRAMEWORK);
+DEFINE_MESSAGE(F2M_RESOURCE_OFFER_REPLY);
+DEFINE_MESSAGE(F2M_REVIVE_OFFERS);
+DEFINE_MESSAGE(F2M_KILL_TASK);
+DEFINE_MESSAGE(F2M_FRAMEWORK_MESSAGE);
+DEFINE_MESSAGE(F2M_STATUS_UPDATE_ACK);
+
+// From master to framework.
+DEFINE_MESSAGE(M2F_REGISTER_REPLY);
+DEFINE_MESSAGE(M2F_RESOURCE_OFFER);
+DEFINE_MESSAGE(M2F_RESCIND_OFFER);
+DEFINE_MESSAGE(M2F_STATUS_UPDATE);
+DEFINE_MESSAGE(M2F_LOST_SLAVE);
+DEFINE_MESSAGE(M2F_FRAMEWORK_MESSAGE);
+DEFINE_MESSAGE(M2F_ERROR);
+
+// From slave to master.
+DEFINE_MESSAGE(S2M_REGISTER_SLAVE);
+DEFINE_MESSAGE(S2M_REREGISTER_SLAVE);
+DEFINE_MESSAGE(S2M_UNREGISTER_SLAVE);
+DEFINE_MESSAGE(S2M_STATUS_UPDATE);
+DEFINE_MESSAGE(S2M_FRAMEWORK_MESSAGE);
+DEFINE_MESSAGE(S2M_EXITED_EXECUTOR);
+
+// From slave heart to master.
+DEFINE_MESSAGE(SH2M_HEARTBEAT);
+
+// From master to slave.
+DEFINE_MESSAGE(M2S_REGISTER_REPLY);
+DEFINE_MESSAGE(M2S_REREGISTER_REPLY);
+DEFINE_MESSAGE(M2S_RUN_TASK);
+DEFINE_MESSAGE(M2S_KILL_TASK);
+DEFINE_MESSAGE(M2S_KILL_FRAMEWORK);
+DEFINE_MESSAGE(M2S_FRAMEWORK_MESSAGE);
+DEFINE_MESSAGE(M2S_UPDATE_FRAMEWORK);
+DEFINE_MESSAGE(M2S_STATUS_UPDATE_ACK);
+
+// From executor to slave.
+DEFINE_MESSAGE(E2S_REGISTER_EXECUTOR);
+DEFINE_MESSAGE(E2S_STATUS_UPDATE);
+DEFINE_MESSAGE(E2S_FRAMEWORK_MESSAGE);
+
+// From slave to executor.
+DEFINE_MESSAGE(S2E_REGISTER_REPLY);
+DEFINE_MESSAGE(S2E_RUN_TASK);
+DEFINE_MESSAGE(S2E_KILL_TASK);
+DEFINE_MESSAGE(S2E_FRAMEWORK_MESSAGE);
+DEFINE_MESSAGE(S2E_KILL_EXECUTOR);
 
 #ifdef __sun__
-INITIALIZE_MESSAGE(PD2S_REGISTER_PROJD);
-INITIALIZE_MESSAGE(PD2S_PROJD_READY);
-INITIALIZE_MESSAGE(S2PD_UPDATE_RESOURCES);
-INITIALIZE_MESSAGE(S2PD_KILL_ALL);
+// From projd to slave.
+DEFINE_MESSAGE(PD2S_REGISTER_PROJD);
+DEFINE_MESSAGE(PD2S_PROJD_READY);
+
+// From slave to projd.
+DEFINE_MESSAGE(S2PD_UPDATE_RESOURCES);
+DEFINE_MESSAGE(S2PD_KILL_ALL);
 #endif // __sun__
 
-INITIALIZE_MESSAGE(GOT_MASTER_TOKEN);
-INITIALIZE_MESSAGE(NEW_MASTER_DETECTED);
-INITIALIZE_MESSAGE(NO_MASTER_DETECTED);
-INITIALIZE_MESSAGE(MASTER_DETECTION_FAILURE);
-
-INITIALIZE_MESSAGE(MESOS_MSGID);
+// From master detector to processes.
+DEFINE_MESSAGE(GOT_MASTER_TOKEN);
+DEFINE_MESSAGE(NEW_MASTER_DETECTED);
+DEFINE_MESSAGE(NO_MASTER_DETECTED);
+DEFINE_MESSAGE(MASTER_DETECTION_FAILURE);
 
 }} // namespace mesos { namespace internal {

Modified: incubator/mesos/trunk/src/messaging/messages.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/messaging/messages.hpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/messaging/messages.hpp (original)
+++ incubator/mesos/trunk/src/messaging/messages.hpp Sun Jun  5 09:18:22 2011
@@ -15,124 +15,106 @@
 
 #include <boost/unordered_map.hpp>
 
+#include "common/utils.hpp"
+
 #include "messaging/messages.pb.h"
 
 
 namespace mesos { namespace internal {
 
-enum MSGID {
-  // Artifacts from libprocess.
-  PROCESS_TIMEOUT,
-  PROCESS_EXIT,
-  PROCESS_TERMINATE,
-
-  // From framework to master.
-  F2M_REGISTER_FRAMEWORK,
-  F2M_REREGISTER_FRAMEWORK,
-  F2M_UNREGISTER_FRAMEWORK,
-  F2M_RESOURCE_OFFER_REPLY,
-  F2M_REVIVE_OFFERS,
-  F2M_KILL_TASK,
-  F2M_FRAMEWORK_MESSAGE,
-  F2M_STATUS_UPDATE_ACK,
-  
-  // From master to framework.
-  M2F_REGISTER_REPLY,
-  M2F_RESOURCE_OFFER,
-  M2F_RESCIND_OFFER,
-  M2F_STATUS_UPDATE,
-  M2F_LOST_SLAVE,
-  M2F_FRAMEWORK_MESSAGE,
-  M2F_ERROR,
-  
-  // From slave to master.
-  S2M_REGISTER_SLAVE,
-  S2M_REREGISTER_SLAVE,
-  S2M_UNREGISTER_SLAVE,
-  S2M_STATUS_UPDATE,
-  S2M_FRAMEWORK_MESSAGE,
-  S2M_EXITED_EXECUTOR,
-
-  // From slave heart to master.
-  SH2M_HEARTBEAT,
-  
-  // From master to slave.
-  M2S_REGISTER_REPLY,
-  M2S_REREGISTER_REPLY,
-  M2S_RUN_TASK,
-  M2S_KILL_TASK,
-  M2S_KILL_FRAMEWORK,
-  M2S_FRAMEWORK_MESSAGE,
-  M2S_UPDATE_FRAMEWORK,
-  M2S_STATUS_UPDATE_ACK,
-
-  // From executor to slave.
-  E2S_REGISTER_EXECUTOR,
-  E2S_STATUS_UPDATE,
-  E2S_FRAMEWORK_MESSAGE,
-
-  // From slave to executor.
-  S2E_REGISTER_REPLY,
-  S2E_RUN_TASK,
-  S2E_KILL_TASK,
-  S2E_FRAMEWORK_MESSAGE,
-  S2E_KILL_EXECUTOR,
-
-#ifdef __sun__
-  // From projd to slave.
-  PD2S_REGISTER_PROJD,
-  PD2S_PROJECT_READY,
-
-  // From slave to projd.
-  S2PD_UPDATE_RESOURCES,
-  S2PD_KILL_ALL,
-#endif // __sun__
-
-  // From master detector to processes.
-  GOT_MASTER_TOKEN,
-  NEW_MASTER_DETECTED,
-  NO_MASTER_DETECTED,
-  MASTER_DETECTION_FAILURE,
-
-  MESOS_MSGID
-};
-
-
-// To couple a MSGID with a protocol buffer we use a templated class
-// that extends the necessary protocol buffer type (this also allows
-// the code to be better isolated from protocol buffer naming). While
-// protocol buffers are allegedly not meant to be inherited, we
-// decided this was an acceptable option since we don't add any new
-// functionality (or do any thing with the existing functionality).
+// To couple a message name with a protocol buffer we use a templated
+// class that extends the necessary protocol buffer type (this also
+// allows the code to be better isolated from protocol buffer
+// naming). While protocol buffers are allegedly not meant to be
+// inherited, we decided this was an acceptable option since we don't
+// add any new functionality (or do any thing with the existing
+// functionality).
 //
 // To add another message that uses a protocol buffer you need to
 // provide a specialization of the Message class (i.e., using the
 // MESSAGE macro defined below).
-template <MSGID ID>
+template <const char* name>
 class MSG;
 
-#define MESSAGE(ID, T)                          \
-  template <>                                   \
-  class MSG<ID> : public T {}
+#define MESSAGE1(name)                          \
+    extern char name[]
 
+#define MESSAGE2(name, T)                           \
+    extern char name[];                             \
+    template <>                                     \
+    class MSG<name> : public T {}
+                                                                                
+#define MESSAGE(...)                                            \
+    CONCAT(MESSAGE, VA_NUM_ARGS(__VA_ARGS__))(__VA_ARGS__)
 
-class AnyMessage
-{
-public:
-  AnyMessage(const std::string& data_)
-    : data(data_) {}
 
-  template <MSGID ID>
-  operator MSG<ID> () const
-  {
-    MSG<ID> msg;
-    msg.ParseFromString(data);
-    return msg;
-  }
+// From framework to master.
+MESSAGE(F2M_REGISTER_FRAMEWORK, RegisterFrameworkMessage);
+MESSAGE(F2M_REREGISTER_FRAMEWORK, ReregisterFrameworkMessage);
+MESSAGE(F2M_UNREGISTER_FRAMEWORK, UnregisterFrameworkMessage);
+MESSAGE(F2M_RESOURCE_OFFER_REPLY, ResourceOfferReplyMessage);
+MESSAGE(F2M_REVIVE_OFFERS, ReviveOffersMessage);
+MESSAGE(F2M_KILL_TASK, KillTaskMessage);
+MESSAGE(F2M_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
+MESSAGE(F2M_STATUS_UPDATE_ACK, StatusUpdateAckMessage);
 
-private:
-  std::string data;
-};
+// From master to framework.
+MESSAGE(M2F_REGISTER_REPLY, FrameworkRegisteredMessage);
+MESSAGE(M2F_RESOURCE_OFFER, ResourceOfferMessage);
+MESSAGE(M2F_RESCIND_OFFER, RescindResourceOfferMessage);
+MESSAGE(M2F_STATUS_UPDATE, StatusUpdateMessage);
+MESSAGE(M2F_LOST_SLAVE, LostSlaveMessage);
+MESSAGE(M2F_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
+MESSAGE(M2F_ERROR, FrameworkErrorMessage);
+
+// From slave to master.
+MESSAGE(S2M_REGISTER_SLAVE, RegisterSlaveMessage);
+MESSAGE(S2M_REREGISTER_SLAVE, ReregisterSlaveMessage);
+MESSAGE(S2M_UNREGISTER_SLAVE, UnregisterSlaveMessage);
+MESSAGE(S2M_STATUS_UPDATE, StatusUpdateMessage);
+MESSAGE(S2M_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
+MESSAGE(S2M_EXITED_EXECUTOR, ExitedExecutorMessage);
+
+// From slave heart to master.
+MESSAGE(SH2M_HEARTBEAT, HeartbeatMessage);
+
+// From master to slave.
+MESSAGE(M2S_REGISTER_REPLY, SlaveRegisteredMessage);
+MESSAGE(M2S_REREGISTER_REPLY, SlaveRegisteredMessage);
+MESSAGE(M2S_RUN_TASK, RunTaskMessage);
+MESSAGE(M2S_KILL_TASK, KillTaskMessage);
+MESSAGE(M2S_KILL_FRAMEWORK, KillFrameworkMessage);
+MESSAGE(M2S_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
+MESSAGE(M2S_UPDATE_FRAMEWORK, UpdateFrameworkMessage);
+MESSAGE(M2S_STATUS_UPDATE_ACK, StatusUpdateAckMessage);
+
+// From executor to slave.
+MESSAGE(E2S_REGISTER_EXECUTOR, RegisterExecutorMessage);
+MESSAGE(E2S_STATUS_UPDATE, StatusUpdateMessage);
+MESSAGE(E2S_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
+
+// From slave to executor.
+MESSAGE(S2E_REGISTER_REPLY, ExecutorRegisteredMessage);
+MESSAGE(S2E_RUN_TASK, RunTaskMessage);
+MESSAGE(S2E_KILL_TASK, KillTaskMessage);
+MESSAGE(S2E_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
+MESSAGE(S2E_KILL_EXECUTOR);
+
+#ifdef __sun__
+// From projd to slave.
+MESSAGE(PD2S_REGISTER_PROJD, RegisterProjdMessage);
+MESSAGE(PD2S_PROJD_READY, ProjdReadyMessage);
+
+// From slave to projd.
+MESSAGE(S2PD_UPDATE_RESOURCES, ProjdUpdateResourcesMessage);
+MESSAGE(S2PD_KILL_ALL);
+#endif // __sun__
+
+// From master detector to processes.
+MESSAGE(GOT_MASTER_TOKEN, GotMasterTokenMessage);
+MESSAGE(NEW_MASTER_DETECTED, NewMasterDetectedMessage);
+MESSAGE(NO_MASTER_DETECTED);
+MESSAGE(MASTER_DETECTION_FAILURE);
 
 
 // Type conversions helpful for changing between protocol buffer types
@@ -156,11 +138,6 @@ std::vector<T> convert(const google::pro
 }
 
 
-// Mapping between message names to message ids.
-extern boost::unordered_map<std::string, MSGID> ids;
-extern boost::unordered_map<MSGID, std::string> names;
-
-
 template <typename T>
 class MesosProcess : public process::Process<T>
 {
@@ -170,99 +147,44 @@ public:
 
   virtual ~MesosProcess() {}
 
-  static void post(const process::UPID& to, const std::string& name)
-  {
-    process::post(to, name);
-  }
-
-  static void post(const process::UPID& to, MSGID id)
+  template <const char *name>
+  static void post(const process::UPID& to, const MSG<name>& msg)
   {
-    CHECK(names.count(id) > 0) << "Missing name for MSGID " << id;
-    process::post(to, names[id]);
-  }
-
-  template <MSGID ID>
-  static void post(const process::UPID& to, const MSG<ID>& msg)
-  {
-    CHECK(names.count(ID) > 0) << "Missing name for MSGID " << ID;
     std::string data;
     msg.SerializeToString(&data);
-    process::post(to, names[ID], data.data(), data.size());
+    process::post(to, name, data.data(), data.size());
   }
 
 protected:
-  AnyMessage message() const
-  {
-    return AnyMessage(process::Process<T>::body());
-  }
-
-  MSGID msgid() const
-  {
-    CHECK(ids.count(process::Process<T>::name()) > 0)
-      << "Missing MSGID for '" << process::Process<T>::name() << "'";
-    return ids[process::Process<T>::name()];
-  }
-
   void send(const process::UPID& to, const std::string& name)
   {
     process::Process<T>::send(to, name);
   }
 
-  void send(const process::UPID& to, MSGID id)
+  template <const char* name>
+  void send(const process::UPID& to, const MSG<name>& msg)
   {
-    CHECK(names.count(id) > 0) << "Missing name for MSGID " << id;
-    process::Process<T>::send(to, names[id]);
-  }
-
-  template <MSGID ID>
-  void send(const process::UPID& to, const MSG<ID>& msg)
-  {
-    CHECK(names.count(ID) > 0) << "Missing name for MSGID " << ID;
     std::string data;
     msg.SerializeToString(&data);
-    process::Process<T>::send(to, names[ID], data.data(), data.size());
-  }
-
-  MSGID receive(double secs = 0)
-  {
-    while (true) {
-      process::Process<T>::receive(secs);
-      if (ids.count(process::Process<T>::name()) > 0) {
-        return ids[process::Process<T>::name()];
-      } else {
-        LOG(WARNING) << "Dropping unknown message '"
-                     << process::Process<T>::name() << "'"
-                     << " from: " << process::Process<T>::from()
-                     << " to: " << process::Process<T>::self();
-      }
-    }
+    process::Process<T>::send(to, name, data.data(), data.size());
   }
 
-  MSGID serve(double secs = 0)
+  const std::string& serve(double secs = 0, bool once = false)
   {
-    while (true) {
-      process::Process<T>::serve(secs);
-      if (ids.count(process::Process<T>::name()) > 0) {
-        // Check if this has been bound and invoke the handler.
-        if (handlers.count(process::Process<T>::name()) > 0) {
-          handlers[process::Process<T>::name()](process::Process<T>::body());
-        } else {
-          return ids[process::Process<T>::name()];
-        }
+    do {
+      process::Process<T>::serve(secs, once);
+      if (handlers.count(process::Process<T>::name()) > 0) {
+        handlers[process::Process<T>::name()](process::Process<T>::body());
       } else {
-        LOG(WARNING) << "Dropping unknown message '"
-                     << process::Process<T>::name() << "'"
-                     << " from: " << process::Process<T>::from()
-                     << " to: " << process::Process<T>::self();
+        return process::Process<T>::name();
       }
-    }
+    } while (!once);
   }
 
-  void install(MSGID id, void (T::*method)())
+  void install(const std::string& name, void (T::*method)())
   {
     T* t = static_cast<T*>(this);
-    CHECK(names.count(id) > 0);
-    handlers[names[id]] =
+    handlers[name] =
       std::tr1::bind(&MesosProcess<T>::handler0, t,
                      method,
                      std::tr1::placeholders::_1);
@@ -270,12 +192,11 @@ protected:
 
   template <typename PB,
             typename P1, typename P1C>
-  void install(MSGID id, void (T::*method)(P1C),
+  void install(const std::string& name, void (T::*method)(P1C),
                P1 (PB::*param1)() const)
   {
     T* t = static_cast<T*>(this);
-    CHECK(names.count(id) > 0);
-    handlers[names[id]] =
+    handlers[name] =
       std::tr1::bind(&handler1<PB, P1, P1C>, t,
                      method, param1,
                      std::tr1::placeholders::_1);
@@ -284,13 +205,12 @@ protected:
   template <typename PB,
             typename P1, typename P1C,
             typename P2, typename P2C>
-  void install(MSGID id, void (T::*method)(P1C, P2C),
+  void install(const std::string& name, void (T::*method)(P1C, P2C),
                P1 (PB::*p1)() const,
                P2 (PB::*p2)() const)
   {
     T* t = static_cast<T*>(this);
-    CHECK(names.count(id) > 0);
-    handlers[names[id]] =
+    handlers[name] =
       std::tr1::bind(&handler2<PB, P1, P1C, P2, P2C>, t,
                      method, p1, p2,
                      std::tr1::placeholders::_1);
@@ -300,15 +220,14 @@ protected:
             typename P1, typename P1C,
             typename P2, typename P2C,
             typename P3, typename P3C>
-  void install(MSGID id,
+  void install(const std::string& name,
                void (T::*method)(P1C, P2C, P3C),
                P1 (PB::*p1)() const,
                P2 (PB::*p2)() const,
                P3 (PB::*p3)() const)
   {
     T* t = static_cast<T*>(this);
-    CHECK(names.count(id) > 0);
-    handlers[names[id]] =
+    handlers[name] =
       std::tr1::bind(&handler3<PB, P1, P1C, P2, P2C, P3, P3C>, t,
                      method, p1, p2, p3,
                      std::tr1::placeholders::_1);
@@ -319,7 +238,7 @@ protected:
             typename P2, typename P2C,
             typename P3, typename P3C,
             typename P4, typename P4C>
-  void install(MSGID id,
+  void install(const std::string& name,
                void (T::*method)(P1C, P2C, P3C, P4C),
                P1 (PB::*p1)() const,
                P2 (PB::*p2)() const,
@@ -327,8 +246,7 @@ protected:
                P4 (PB::*p4)() const)
   {
     T* t = static_cast<T*>(this);
-    CHECK(names.count(id) > 0);
-    handlers[names[id]] =
+    handlers[name] =
       std::tr1::bind(&handler4<PB, P1, P1C, P2, P2C, P3, P3C, P4, P4C>, t,
                      method, p1, p2, p3, p4,
                      std::tr1::placeholders::_1);
@@ -340,7 +258,7 @@ protected:
             typename P3, typename P3C,
             typename P4, typename P4C,
             typename P5, typename P5C>
-  void install(MSGID id,
+  void install(const std::string& name,
                void (T::*method)(P1C, P2C, P3C, P4C, P5C),
                P1 (PB::*p1)() const,
                P2 (PB::*p2)() const,
@@ -349,8 +267,7 @@ protected:
                P5 (PB::*p5)() const)
   {
     T* t = static_cast<T*>(this);
-    CHECK(names.count(id) > 0);
-    handlers[names[id]] =
+    handlers[name] =
       std::tr1::bind(&handler5<PB, P1, P1C, P2, P2C, P3, P3C, P4, P4C, P5, P5C>, t,
                      method, p1, p2, p3, p4, p5,
                      std::tr1::placeholders::_1);
@@ -370,7 +287,7 @@ private:
                        const std::string& data)
   {
     PB pb;
-    pb.ParseFromArray(data.data(), data.size());
+    pb.ParseFromString(data);
     if (pb.IsInitialized()) {
       (t->*method)(convert((&pb->*p1)()));
     } else {
@@ -388,7 +305,7 @@ private:
                        const std::string& data)
   {
     PB pb;
-    pb.ParseFromArray(data.data(), data.size());
+    pb.ParseFromString(data);
     if (pb.IsInitialized()) {
       (t->*method)(convert((&pb->*p1)()), convert((&pb->*p2)()));
     } else {
@@ -408,7 +325,7 @@ private:
                        const std::string& data)
   {
     PB pb;
-    pb.ParseFromArray(data.data(), data.size());
+    pb.ParseFromString(data);
     if (pb.IsInitialized()) {
       (t->*method)(convert((&pb->*p1)()), convert((&pb->*p2)()),
                    convert((&pb->*p3)()));
@@ -431,7 +348,7 @@ private:
                        const std::string& data)
   {
     PB pb;
-    pb.ParseFromArray(data.data(), data.size());
+    pb.ParseFromString(data);
     if (pb.IsInitialized()) {
       (t->*method)(convert((&pb->*p1)()), convert((&pb->*p2)()),
                    convert((&pb->*p3)()), convert((&pb->*p4)()));
@@ -456,7 +373,7 @@ private:
                        const std::string& data)
   {
     PB pb;
-    pb.ParseFromArray(data.data(), data.size());
+    pb.ParseFromString(data);
     if (pb.IsInitialized()) {
       (t->*method)(convert((&pb->*p1)()), convert((&pb->*p2)()),
                    convert((&pb->*p3)()), convert((&pb->*p4)()),
@@ -470,60 +387,6 @@ private:
   boost::unordered_map<std::string, std::tr1::function<void (const std::string&)> > handlers;
 };
 
-
-MESSAGE(F2M_REGISTER_FRAMEWORK, RegisterFrameworkMessage);
-MESSAGE(F2M_REREGISTER_FRAMEWORK, ReregisterFrameworkMessage);
-MESSAGE(F2M_UNREGISTER_FRAMEWORK, UnregisterFrameworkMessage);
-MESSAGE(F2M_RESOURCE_OFFER_REPLY, ResourceOfferReplyMessage);
-MESSAGE(F2M_REVIVE_OFFERS, ReviveOffersMessage);
-MESSAGE(F2M_KILL_TASK, KillTaskMessage);
-MESSAGE(F2M_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-MESSAGE(F2M_STATUS_UPDATE_ACK, StatusUpdateAckMessage);
-
-MESSAGE(M2F_REGISTER_REPLY, FrameworkRegisteredMessage);
-MESSAGE(M2F_RESOURCE_OFFER, ResourceOfferMessage);
-MESSAGE(M2F_RESCIND_OFFER, RescindResourceOfferMessage);
-MESSAGE(M2F_STATUS_UPDATE, StatusUpdateMessage);
-MESSAGE(M2F_LOST_SLAVE, LostSlaveMessage);
-MESSAGE(M2F_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-MESSAGE(M2F_ERROR, FrameworkErrorMessage);
-
-MESSAGE(S2M_REGISTER_SLAVE, RegisterSlaveMessage);
-MESSAGE(S2M_REREGISTER_SLAVE, ReregisterSlaveMessage);
-MESSAGE(S2M_UNREGISTER_SLAVE, UnregisterSlaveMessage);
-MESSAGE(S2M_STATUS_UPDATE, StatusUpdateMessage);
-MESSAGE(S2M_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-MESSAGE(S2M_EXITED_EXECUTOR, ExitedExecutorMessage);
-
-MESSAGE(SH2M_HEARTBEAT, HeartbeatMessage);
-  
-MESSAGE(M2S_REGISTER_REPLY, SlaveRegisteredMessage);
-MESSAGE(M2S_REREGISTER_REPLY, SlaveRegisteredMessage);
-MESSAGE(M2S_RUN_TASK, RunTaskMessage);
-MESSAGE(M2S_KILL_TASK, KillTaskMessage);
-MESSAGE(M2S_KILL_FRAMEWORK, KillFrameworkMessage);
-MESSAGE(M2S_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-MESSAGE(M2S_UPDATE_FRAMEWORK, UpdateFrameworkMessage);
-MESSAGE(M2S_STATUS_UPDATE_ACK, StatusUpdateAckMessage);
-
-MESSAGE(E2S_REGISTER_EXECUTOR, RegisterExecutorMessage);
-MESSAGE(E2S_STATUS_UPDATE, StatusUpdateMessage);
-MESSAGE(E2S_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-
-MESSAGE(S2E_REGISTER_REPLY, ExecutorRegisteredMessage);
-MESSAGE(S2E_RUN_TASK, RunTaskMessage);
-MESSAGE(S2E_KILL_TASK, KillTaskMessage);
-MESSAGE(S2E_FRAMEWORK_MESSAGE, FrameworkMessageMessage);
-
-#ifdef __sun__
-MESSAGE(PD2S_REGISTER_PROJD, RegisterProjdMessage);
-MESSAGE(PD2S_PROJD_READY, ProjdReadyMessage);
-MESSAGE(S2PD_UPDATE_RESOURCES, ProjdUpdateResourcesMessage);
-#endif // __sun__
-
-MESSAGE(NEW_MASTER_DETECTED, NewMasterDetectedMessage);
-MESSAGE(GOT_MASTER_TOKEN, GotMasterTokenMessage);
-
 }} // namespace mesos { namespace internal {
 
 

Modified: incubator/mesos/trunk/src/sched/sched.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/sched/sched.cpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/sched/sched.cpp (original)
+++ incubator/mesos/trunk/src/sched/sched.cpp Sun Jun  5 09:18:22 2011
@@ -104,6 +104,8 @@ public:
     install(M2F_ERROR, &SchedulerProcess::error,
             &FrameworkErrorMessage::code,
             &FrameworkErrorMessage::message);
+
+    install(process::EXITED, &SchedulerProcess::exited);
   }
 
   virtual ~SchedulerProcess() {}
@@ -112,44 +114,26 @@ protected:
   virtual void operator () ()
   {
     while (true) {
-      // Rather than send a message to this process when it is time to
-      // terminate, we set a flag that gets re-read. Sending a message
-      // requires some sort of matching or priority reads that
-      // libprocess currently doesn't support. Note that this field is
-      // only read by this process, so we don't need to protect it in
-      // any way. In fact, using a lock to protect it (or for
-      // providing atomicity for cleanup, for example), might lead to
-      // deadlock with the client code because we already use a lock
-      // in SchedulerDriver. That being said, for now we make
-      // terminate 'volatile' to guarantee that each read is getting a
-      // fresh copy.
-      // TODO(benh): Do a coherent read so as to avoid using 'volatile'.
-      if (terminate)
-        return;
-
-      // TODO(benh): We need to break the receive every so often to
-      // check if 'terminate' has been set. It would be better to just
-      // send a message rather than have a timeout (see the comment
-      // above for why sending a message will still require us to use
-      // the terminate flag).
-      switch (serve(2)) {
-        case PROCESS_EXIT: {
-          // TODO(benh): Don't wait for a new master forever.
-          if (from() == master)
-            VLOG(1) << "Connection to master lost .. waiting for new master";
-          break;
-        }
-
-        case PROCESS_TIMEOUT: {
-          break;
-        }
-
-        default: {
-          VLOG(1) << "Received unknown message " << msgid()
-                  << " from " << from();
-          break;
-        }
-      }
+      // Sending a message to terminate this process is insufficient
+      // because that message might get queued behind a bunch of other
+      // message. So, when it is time to terminate, we set a flag that
+      // gets re-read by this process after every message. In order to
+      // get this correct we must return from each invocation of
+      // 'serve', to check and see if terminate has been set. In
+      // addition, we need to send a dummy message right after we set
+      // terminate just in case there aren't any messages in the
+      // queue. Note that the terminate field is only read by this
+      // process, so we don't need to protect it in any way. In fact,
+      // using a lock to protect it (or for providing atomicity for
+      // cleanup, for example), might lead to deadlock with the client
+      // code because we already use a lock in SchedulerDriver. That
+      // being said, for now we make terminate 'volatile' to guarantee
+      // that each read is getting a fresh copy.
+      // TODO(benh): Do a coherent read so as to avoid using
+      // 'volatile'.
+      if (terminate) return;
+
+      serve(0, true);
     }
   }
 
@@ -285,6 +269,14 @@ protected:
                          cref(message)));
   }
 
+  void exited()
+  {
+    // TODO(benh): Don't wait for a new master forever.
+    if (from() == master) {
+      VLOG(1) << "Connection to master lost .. waiting for new master";
+    }
+  }
+
   void stop()
   {
     if (!active)
@@ -514,11 +506,12 @@ MesosSchedulerDriver::~MesosSchedulerDri
   // ultimately invokes this destructor). This deadlock is actually a
   // bug in the client code: provided that the SchedulerProcess class
   // _only_ makes calls into instances of Scheduler, then such a
-  // deadlock implies that the destructor got called from within a method
-  // of the Scheduler instance that is being destructed! Note
+  // deadlock implies that the destructor got called from within a
+  // method of the Scheduler instance that is being destructed! Note
   // that we could add a method to libprocess that told us whether or
   // not this was about to be deadlock, and possibly report this back
-  // to the user somehow.
+  // to the user somehow. Note that we will also wait forever if
+  // MesosSchedulerDriver::stop was never called.
   if (process != NULL) {
     process::wait(process->self());
     delete process;
@@ -549,14 +542,21 @@ int MesosSchedulerDriver::start()
     return -1;
   }
 
+  // We might have been running before, but have since stopped. Don't
+  // allow this driver to be used again (for now)!
+  if (process != NULL) {
+    return -1;
+  }
+
   // Set running here so we can recognize an exception from calls into
   // Java (via getFrameworkName or getExecutorInfo).
   running = true;
 
   // Get username of current user.
   passwd* passwd;
-  if ((passwd = getpwuid(getuid())) == NULL)
+  if ((passwd = getpwuid(getuid())) == NULL) {
     fatal("failed to get username information");
+  }
 
   // Set up framework info.
   FrameworkInfo framework;
@@ -565,8 +565,9 @@ int MesosSchedulerDriver::start()
   framework.mutable_executor()->MergeFrom(sched->getExecutorInfo(this));
 
   // Something invoked stop while we were in the scheduler, bail.
-  if (!running)
+  if (!running) {
     return -1;
+  }
 
   process = new SchedulerProcess(this, sched, frameworkId, framework);
 
@@ -604,7 +605,7 @@ int MesosSchedulerDriver::stop()
   if (process != NULL) {
     process::dispatch(process->self(), &SchedulerProcess::stop);
     process->terminate = true;
-    process = NULL;
+    process::post(process->self(), process::TERMINATE);
   }
 
   running = false;
@@ -624,8 +625,9 @@ int MesosSchedulerDriver::join()
 {
   Lock lock(&mutex);
 
-  while (running)
+  while (running) {
     pthread_cond_wait(&cond, &mutex);
+  }
 
   return 0;
 }

Modified: incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp (original)
+++ incubator/mesos/trunk/src/slave/process_based_isolation_module.cpp Sun Jun  5 09:18:22 2011
@@ -172,7 +172,7 @@ void ProcessBasedIsolationModule::Reaper
           }
         }
       }
-    } else if (name() == process::TERMINATE || name() == process::EXIT) {
+    } else if (name() == process::TERMINATE || name() == process::EXITED) {
       return;
     }
   }

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Sun Jun  5 09:18:22 2011
@@ -38,7 +38,10 @@ using std::vector;
 Slave::Slave(const Resources& _resources, bool _local,
              IsolationModule *_isolationModule)
   : resources(_resources), local(_local),
-    isolationModule(_isolationModule), heart(NULL) {}
+    isolationModule(_isolationModule), heart(NULL) 
+{
+  initialize();
+}
 
 
 Slave::Slave(const Configuration& _conf, bool _local,
@@ -48,6 +51,8 @@ Slave::Slave(const Configuration& _conf,
 {
   resources =
     Resources::parse(conf.get<string>("resources", "cpus:1;mem:1024"));
+
+  initialize();
 }
 
 
@@ -158,15 +163,16 @@ void Slave::operator () ()
   hostent* he = gethostbyname2(buf, AF_INET);
   string hostname = he->h_name;
 
-  // Get our public DNS name. Normally this is our hostname, but on EC2
-  // we look for the MESOS_PUBLIC_DNS environment variable. This allows
-  // the master to display our public name in its web UI.
+  // Check and see if we have a different public DNS name. Normally
+  // this is our hostname, but on EC2 we look for the MESOS_PUBLIC_DNS
+  // environment variable. This allows the master to display our
+  // public name in its web UI.
   string public_hostname = hostname;
   if (getenv("MESOS_PUBLIC_DNS") != NULL) {
     public_hostname = getenv("MESOS_PUBLIC_DNS");
   }
 
-  SlaveInfo slave;
+  // Initialize slave info.
   slave.set_hostname(hostname);
   slave.set_public_hostname(public_hostname);
   slave.mutable_resources()->MergeFrom(resources);
@@ -175,428 +181,471 @@ void Slave::operator () ()
   isolationModule->initialize(this);
 
   while (true) {
-    receive(1);
-    if (msgid() == PROCESS_TERMINATE) {
+    serve(1);
+    if (name() == process::TERMINATE) {
       LOG(INFO) << "Asked to shut down by " << from();
       foreachpaircopy (_, Framework* framework, frameworks) {
         killFramework(framework);
       }
       return;
     }
+  }
+}
 
-    // Otherwise, don't terminate and see what the message is.
-    switch (msgid()) {
-      case NEW_MASTER_DETECTED: {
-        const MSG<NEW_MASTER_DETECTED>& msg = message();
-
-	LOG(INFO) << "New master at " << msg.pid();
-
-	master = msg.pid();
-	link(master);
-
-	if (slaveId == "") {
-	  // Slave started before master.
-          MSG<S2M_REGISTER_SLAVE> out;
-          out.mutable_slave()->MergeFrom(slave);
-	  send(master, out);
-	} else {
-	  // Re-registering, so send tasks running.
-          MSG<S2M_REREGISTER_SLAVE> out;
-          out.mutable_slave_id()->MergeFrom(slaveId);
-          out.mutable_slave()->MergeFrom(slave);
-
-	  foreachpair (_, Framework* framework, frameworks) {
-	    foreachpair (_, Executor* executor, framework->executors) {
-              foreachpair (_, Task* task, executor->tasks) {
-                out.add_tasks()->MergeFrom(*task);
-              }
-            }
-          }
-
-	  send(master, out);
-	}
-	break;
-      }
-	
-      case NO_MASTER_DETECTED: {
-	LOG(INFO) << "Lost master(s) ... waiting";
-	break;
-      }
 
-      case MASTER_DETECTION_FAILURE: {
-	LOG(FATAL) << "Cannot reliably detect master ... committing suicide!";
-	break;
-      }
+void Slave::initialize()
+{
+  install(NEW_MASTER_DETECTED, &Slave::newMasterDetected,
+          &NewMasterDetectedMessage::pid);
 
-      case M2S_REGISTER_REPLY: {
-        const MSG<M2S_REGISTER_REPLY>& msg = message();
-        slaveId = msg.slave_id();
+  install(NO_MASTER_DETECTED, &Slave::noMasterDetected);
 
-        LOG(INFO) << "Registered with master; given slave ID " << slaveId;
+  install(MASTER_DETECTION_FAILURE, &Slave::masterDetectionFailure);
 
-        heart = new Heart(master, self(), slaveId, msg.heartbeat_interval());
-        link(spawn(heart));
-        break;
-      }
-      
-      case M2S_REREGISTER_REPLY: {
-        const MSG<M2S_REREGISTER_REPLY>& msg = message();
+  install(M2S_REGISTER_REPLY, &Slave::registerReply,
+          &SlaveRegisteredMessage::slave_id,
+          &SlaveRegisteredMessage::heartbeat_interval);
 
-        LOG(INFO) << "Re-registered with master";
+  install(M2S_REREGISTER_REPLY, &Slave::reregisterReply,
+          &SlaveRegisteredMessage::slave_id,
+          &SlaveRegisteredMessage::heartbeat_interval);
 
-        if (!(slaveId == msg.slave_id())) {
-          LOG(FATAL) << "Slave re-registered but got wrong ID";
-        }
+  install(M2S_RUN_TASK, &Slave::runTask,
+          &RunTaskMessage::framework,
+          &RunTaskMessage::framework_id,
+          &RunTaskMessage::pid,
+          &RunTaskMessage::task);
 
-        if (heart != NULL) {
-          send(heart->self(), MESOS_MSGID);
-          wait(heart->self());
-          delete heart;
-        }
+  install(M2S_KILL_TASK, &Slave::killTask,
+          &KillTaskMessage::framework_id,
+          &KillTaskMessage::task_id);
 
-        heart = new Heart(master, self(), slaveId, msg.heartbeat_interval());
-        link(spawn(heart));
-        break;
-      }
-      
-      case M2S_RUN_TASK: {
-        const MSG<M2S_RUN_TASK>& msg = message();
-
-        const TaskDescription& task = msg.task();
-
-        LOG(INFO) << "Got assigned task " << task.task_id()
-                  << " for framework " << msg.framework_id();
-
-        Framework *framework = getFramework(msg.framework_id());
-        if (framework == NULL) {
-          framework =
-            new Framework(msg.framework_id(), msg.framework(), msg.pid());
-          frameworks[msg.framework_id()] = framework;
-        }
+  install(M2S_KILL_FRAMEWORK, &Slave::killFramework,
+          &KillFrameworkMessage::framework_id);
 
-        // Either send the task to an executor or start a new executor
-        // and queue the task until the executor has started.
-        Executor* executor = task.has_executor()
-          ? framework->getExecutor(task.executor().executor_id())
-          : framework->getExecutor(framework->info.executor().executor_id());
-        
-        if (executor != NULL) {
-          if (!executor->pid) {
-            // Queue task until the executor starts up.
-            executor->queuedTasks.push_back(task);
-          } else {
-            // Add the task to the executor.
-            executor->addTask(task);
-
-            MSG<S2E_RUN_TASK> out;
-            out.mutable_framework()->MergeFrom(framework->info);
-            out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-            out.set_pid(framework->pid);
-            out.mutable_task()->MergeFrom(task);
-            send(executor->pid, out);
-            isolationModule->resourcesChanged(framework, executor);
-          }
-        } else {
-          // Launch an executor for this task.
-          if (task.has_executor()) {
-            executor = framework->createExecutor(task.executor());
-          } else {
-            executor = framework->createExecutor(framework->info.executor());
-          }
+  install(M2S_FRAMEWORK_MESSAGE, &Slave::schedulerMessage,
+          &FrameworkMessageMessage::framework_id,
+          &FrameworkMessageMessage::message);
 
-          // Queue task until the executor starts up.
-          executor->queuedTasks.push_back(task);
+  install(M2S_UPDATE_FRAMEWORK, &Slave::updateFramework,
+          &UpdateFrameworkMessage::framework_id,
+          &UpdateFrameworkMessage::pid);
 
-          // Tell the isolation module to launch the executor.
-          isolationModule->launchExecutor(framework, executor);
-        }
-        break;
-      }
+  install(M2S_STATUS_UPDATE_ACK, &Slave::statusUpdateAck,
+          &StatusUpdateAckMessage::framework_id,
+          &StatusUpdateAckMessage::slave_id,
+          &StatusUpdateAckMessage::task_id);
 
-      case M2S_KILL_TASK: {
-        const MSG<M2S_KILL_TASK>& msg = message();
+  install(E2S_REGISTER_EXECUTOR, &Slave::registerExecutor,
+          &RegisterExecutorMessage::framework_id,
+          &RegisterExecutorMessage::executor_id);
 
-        LOG(INFO) << "Asked to kill task " << msg.task_id()
-                  << " of framework " << msg.framework_id();
+  install(E2S_STATUS_UPDATE, &Slave::statusUpdate,
+          &StatusUpdateMessage::framework_id,
+          &StatusUpdateMessage::status);
 
-        Framework* framework = getFramework(msg.framework_id());
-        if (framework != NULL) {
-          // Tell the executor to kill the task if it is up and
-          // running, otherwise, consider the task lost.
-          Executor* executor = framework->getExecutor(msg.task_id());
-          if (executor == NULL || !executor->pid) {
-            // Update the resources locally, if an executor comes up
-            // after this then it just won't receive this task.
-            executor->removeTask(msg.task_id());
-            isolationModule->resourcesChanged(framework, executor);
-
-            MSG<S2M_STATUS_UPDATE> out;
-            out.mutable_framework_id()->MergeFrom(msg.framework_id());
-            TaskStatus *status = out.mutable_status();
-            status->mutable_task_id()->MergeFrom(msg.task_id());
-            status->mutable_slave_id()->MergeFrom(slaveId);
-            status->set_state(TASK_LOST);
-            send(master, out);
-
-            double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
-            framework->statuses[deadline][status->task_id()] = *status;
-          } else {
-            // Otherwise, send a message to the executor and wait for
-            // it to send us a status update.
-            MSG<S2E_KILL_TASK> out;
-            out.mutable_framework_id()->MergeFrom(msg.framework_id());
-            out.mutable_task_id()->MergeFrom(msg.task_id());
-            send(executor->pid, out);
-          }
-        } else {
-          LOG(WARNING) << "Cannot kill task " << msg.task_id()
-                       << " of framework " << msg.framework_id()
-                       << " because no such framework is running";
+  install(E2S_FRAMEWORK_MESSAGE, &Slave::executorMessage,
+          &FrameworkMessageMessage::framework_id,
+          &FrameworkMessageMessage::message);
 
-          MSG<S2M_STATUS_UPDATE> out;
-          out.mutable_framework_id()->MergeFrom(msg.framework_id());
-          TaskStatus *status = out.mutable_status();
-          status->mutable_task_id()->MergeFrom(msg.task_id());
-          status->mutable_slave_id()->MergeFrom(slaveId);
-          status->set_state(TASK_LOST);
-          send(master, out);
+  install(process::TIMEOUT, &Slave::timeout);
 
-          double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
-          framework->statuses[deadline][status->task_id()] = *status;
+  install(process::EXITED, &Slave::exited);
+}
+
+
+void Slave::newMasterDetected(const string& pid)
+{
+  LOG(INFO) << "New master at " << pid;
+
+  master = pid;
+  link(master);
+
+  if (slaveId == "") {
+    // Slave started before master.
+    MSG<S2M_REGISTER_SLAVE> out;
+    out.mutable_slave()->MergeFrom(slave);
+    send(master, out);
+  } else {
+    // Re-registering, so send tasks running.
+    MSG<S2M_REREGISTER_SLAVE> out;
+    out.mutable_slave_id()->MergeFrom(slaveId);
+    out.mutable_slave()->MergeFrom(slave);
+
+    foreachpair (_, Framework* framework, frameworks) {
+      foreachpair (_, Executor* executor, framework->executors) {
+        foreachpair (_, Task* task, executor->tasks) {
+          out.add_tasks()->MergeFrom(*task);
         }
-        break;
       }
+    }
 
-      case M2S_KILL_FRAMEWORK: {
-        const MSG<M2S_KILL_FRAMEWORK>&msg = message();
+    send(master, out);
+  }
+}
 
-        LOG(INFO) << "Asked to kill framework " << msg.framework_id();
 
-        Framework *framework = getFramework(msg.framework_id());
-        if (framework != NULL)
-          killFramework(framework);
-        break;
-      }
+void Slave::noMasterDetected()
+{
+  LOG(INFO) << "Lost master(s) ... waiting";
+}
 
-      case M2S_FRAMEWORK_MESSAGE: {
-        const MSG<M2S_FRAMEWORK_MESSAGE>&msg = message();
 
-        Framework* framework = getFramework(msg.framework_id());
-        if (framework != NULL) {
-          const FrameworkMessage& message = msg.message();
-
-          Executor* executor = framework->getExecutor(message.executor_id());
-          if (executor == NULL) {
-            LOG(WARNING) << "Dropping message for executor '"
-                         << message.executor_id() << "' of framework "
-                         << msg.framework_id()
-                         << " because executor does not exist";
-          } else if (!executor->pid) {
-            // TODO(*): If executor is not started, queue framework message?
-            // (It's probably okay to just drop it since frameworks can have
-            // the executor send a message to the master to say when it's ready.)
-            LOG(WARNING) << "Dropping message for executor '"
-                         << message.executor_id() << "' of framework "
-                         << msg.framework_id()
-                         << " because executor is not running";
-          } else {
-            MSG<S2E_FRAMEWORK_MESSAGE> out;
-            out.mutable_framework_id()->MergeFrom(msg.framework_id());
-            out.mutable_message()->MergeFrom(message);
-            send(executor->pid, out);
-          }
-        } else {
-          LOG(WARNING) << "Dropping message for framework "
-                       << msg.framework_id()
-                       << " because it does not exist";
-        }
-        break;
-      }
+void Slave::masterDetectionFailure()
+{
+  LOG(FATAL) << "Cannot reliably detect master ... committing suicide!";
+}
 
-      case M2S_UPDATE_FRAMEWORK: {
-        const MSG<M2S_UPDATE_FRAMEWORK>&msg = message();
 
-        Framework *framework = getFramework(msg.framework_id());
-        if (framework != NULL) {
-          LOG(INFO) << "Updating framework " << msg.framework_id()
-                    << " pid to " << msg.pid();
-          framework->pid = msg.pid();
-        }
-        break;
-      }
+void Slave::registerReply(const SlaveID& slaveId, double heartbeat_interval)
+{
+  LOG(INFO) << "Registered with master; given slave ID " << slaveId;
+  this->slaveId = slaveId;
+  heart = new Heart(master, self(), slaveId, heartbeat_interval);
+  link(spawn(heart));
+}
 
-      case M2S_STATUS_UPDATE_ACK: {
-        const MSG<M2S_STATUS_UPDATE_ACK>& msg = message();
 
-        Framework* framework = getFramework(msg.framework_id());
-        if (framework != NULL) {
-          foreachpair (double deadline, _, framework->statuses) {
-            if (framework->statuses[deadline].count(msg.task_id()) > 0) {
-              LOG(INFO) << "Got acknowledgement of status update"
-                        << " for task " << msg.task_id()
-                        << " of framework " << framework->frameworkId;
-              framework->statuses[deadline].erase(msg.task_id());
-              break;
-            }
-          }
-        }
-        break;
-      }
+void Slave::reregisterReply(const SlaveID& slaveId, double heartbeat_interval)
+{
+  LOG(INFO) << "Re-registered with master";
 
-      case E2S_REGISTER_EXECUTOR: {
-        const MSG<E2S_REGISTER_EXECUTOR>& msg = message();
+  if (!(this->slaveId == slaveId)) {
+    LOG(FATAL) << "Slave re-registered but got wrong ID";
+  }
 
-        LOG(INFO) << "Got registration for executor '"
-                  << msg.executor_id() << "' of framework "
-                  << msg.framework_id();
-
-        Framework* framework = getFramework(msg.framework_id());
-        if (framework != NULL) {
-          Executor* executor = framework->getExecutor(msg.executor_id());
-
-          // Check the status of the executor.
-          if (executor == NULL) {
-            LOG(WARNING) << "Not expecting executor '" << msg.executor_id()
-                         << "' of framework " << msg.framework_id();
-            send(from(), S2E_KILL_EXECUTOR);
-          } else if (executor->pid != UPID()) {
-            LOG(WARNING) << "Not good, executor '" << msg.executor_id()
-                         << "' of framework " << msg.framework_id()
-                         << " is already running";
-            send(from(), S2E_KILL_EXECUTOR);
-          } else {
-            // Save the pid for the executor.
-            executor->pid = from();
-
-            // Now that the executor is up, set its resource limits.
-            isolationModule->resourcesChanged(framework, executor);
-
-            // Tell executor it's registered and give it any queued tasks.
-            MSG<S2E_REGISTER_REPLY> out;
-            ExecutorArgs* args = out.mutable_args();
-            args->mutable_framework_id()->MergeFrom(framework->frameworkId);
-            args->set_name(framework->info.name());
-            args->mutable_slave_id()->MergeFrom(slaveId);
-            args->set_hostname(hostname);
-            args->set_data(framework->info.executor().data());
-            send(executor->pid, out);
-            sendQueuedTasks(framework, executor);
-          }
-        } else {
-          // Framework is gone; tell the executor to exit.
-          LOG(WARNING) << "Framework " << msg.framework_id()
-                       << " does not exist (it may have been killed),"
-                       << " telling executor to exit";
-
-          // TODO(benh): Don't we also want to tell the isolation
-          // module to shut this guy down!
-          send(from(), S2E_KILL_EXECUTOR);
-        }
-        break;
-      }
+  if (heart != NULL) {
+    send(heart->self(), process::TERMINATE);
+    wait(heart->self());
+    delete heart;
+  }
 
-      case E2S_STATUS_UPDATE: {
-        const MSG<E2S_STATUS_UPDATE>& msg = message();
+  heart = new Heart(master, self(), slaveId, heartbeat_interval);
+  link(spawn(heart));
+}
 
-        const TaskStatus& status = msg.status();
 
-	LOG(INFO) << "Status update: task " << status.task_id()
-		  << " of framework " << msg.framework_id()
-		  << " is now in state "
-		  << TaskState_descriptor()->FindValueByNumber(status.state())->name();
-
-        Framework *framework = getFramework(msg.framework_id());
-        if (framework != NULL) {
-          Executor* executor = framework->getExecutor(status.task_id());
-          if (executor != NULL) {
-            if (status.state() == TASK_FINISHED ||
-                status.state() == TASK_FAILED ||
-                status.state() == TASK_KILLED ||
-                status.state() == TASK_LOST) {
-              executor->removeTask(status.task_id());
-              isolationModule->resourcesChanged(framework, executor);
-            }
-
-            // Send message and record the status for possible resending.
-            MSG<S2M_STATUS_UPDATE> out;
-            out.mutable_framework_id()->MergeFrom(msg.framework_id());
-            out.mutable_status()->MergeFrom(status);
-            send(master, out);
-
-            double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
-            framework->statuses[deadline][status.task_id()] = status;
-          } else {
-            LOG(WARNING) << "Status update error: couldn't lookup "
-                         << "executor for framework " << msg.framework_id();
-          }
-	} else {
-          LOG(WARNING) << "Status update error: couldn't lookup "
-                       << "framework " << msg.framework_id();
-	}
-        break;
-      }
+void Slave::runTask(const FrameworkInfo& frameworkInfo,
+                    const FrameworkID& frameworkId,
+                    const string& pid,
+                    const TaskDescription& task)
+{
+  LOG(INFO) << "Got assigned task " << task.task_id()
+            << " for framework " << frameworkId;
 
-      case E2S_FRAMEWORK_MESSAGE: {
-        const MSG<E2S_FRAMEWORK_MESSAGE>& msg = message();
+  Framework* framework = getFramework(frameworkId);
+  if (framework == NULL) {
+    framework = new Framework(frameworkId, frameworkInfo, pid);
+    frameworks[frameworkId] = framework;
+  }
 
-        const FrameworkMessage& message = msg.message();
+  // Either send the task to an executor or start a new executor
+  // and queue the task until the executor has started.
+  Executor* executor = task.has_executor()
+    ? framework->getExecutor(task.executor().executor_id())
+    : framework->getExecutor(framework->info.executor().executor_id());
+        
+  if (executor != NULL) {
+    if (!executor->pid) {
+      // Queue task until the executor starts up.
+      executor->queuedTasks.push_back(task);
+    } else {
+      // Add the task to the executor.
+      executor->addTask(task);
 
-        Framework *framework = getFramework(msg.framework_id());
-        if (framework != NULL) {
-	  LOG(INFO) << "Sending message for framework "
-                    << framework->frameworkId
-		    << " to " << framework->pid;
-
-          // TODO(benh): This is weird, sending an M2F message.
-          MSG<M2F_FRAMEWORK_MESSAGE> out;
-          out.mutable_framework_id()->MergeFrom(msg.framework_id());
-          out.mutable_message()->MergeFrom(message);
-          out.mutable_message()->mutable_slave_id()->MergeFrom(slaveId);
-          send(framework->pid, out);
-        }
-        break;
-      }
+      MSG<S2E_RUN_TASK> out;
+      out.mutable_framework()->MergeFrom(framework->info);
+      out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+      out.set_pid(framework->pid);
+      out.mutable_task()->MergeFrom(task);
+      send(executor->pid, out);
+      isolationModule->resourcesChanged(framework, executor);
+    }
+  } else {
+    // Launch an executor for this task.
+    if (task.has_executor()) {
+      executor = framework->createExecutor(task.executor());
+    } else {
+      executor = framework->createExecutor(framework->info.executor());
+    }
+
+    // Queue task until the executor starts up.
+    executor->queuedTasks.push_back(task);
+
+    // Tell the isolation module to launch the executor.
+    isolationModule->launchExecutor(framework, executor);
+  }
+}
+
+
+void Slave::killTask(const FrameworkID& frameworkId,
+                     const TaskID& taskId)
+{
+  LOG(INFO) << "Asked to kill task " << taskId
+            << " of framework " << frameworkId;
 
-      case PROCESS_EXIT: {
-        LOG(INFO) << "Process exited: " << from();
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    // Tell the executor to kill the task if it is up and
+    // running, otherwise, consider the task lost.
+    Executor* executor = framework->getExecutor(taskId);
+    if (executor == NULL || !executor->pid) {
+      // Update the resources locally, if an executor comes up
+      // after this then it just won't receive this task.
+      executor->removeTask(taskId);
+      isolationModule->resourcesChanged(framework, executor);
+
+      MSG<S2M_STATUS_UPDATE> out;
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      TaskStatus *status = out.mutable_status();
+      status->mutable_task_id()->MergeFrom(taskId);
+      status->mutable_slave_id()->MergeFrom(slaveId);
+      status->set_state(TASK_LOST);
+      send(master, out);
 
-        if (from() == master) {
-	  LOG(WARNING) << "Master disconnected! "
-		       << "Waiting for a new master to be elected.";
-	  // TODO(benh): After so long waiting for a master, commit suicide.
-	}
+      double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
+      framework->statuses[deadline][status->task_id()] = *status;
+    } else {
+      // Otherwise, send a message to the executor and wait for
+      // it to send us a status update.
+      MSG<S2E_KILL_TASK> out;
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      out.mutable_task_id()->MergeFrom(taskId);
+      send(executor->pid, out);
+    }
+  } else {
+    LOG(WARNING) << "Cannot kill task " << taskId
+                 << " of framework " << frameworkId
+                 << " because no such framework is running";
+
+    MSG<S2M_STATUS_UPDATE> out;
+    out.mutable_framework_id()->MergeFrom(frameworkId);
+    TaskStatus *status = out.mutable_status();
+    status->mutable_task_id()->MergeFrom(taskId);
+    status->mutable_slave_id()->MergeFrom(slaveId);
+    status->set_state(TASK_LOST);
+    send(master, out);
+
+    double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
+    framework->statuses[deadline][status->task_id()] = *status;
+  }
+}
+
+
+void Slave::killFramework(const FrameworkID& frameworkId)
+{
+  LOG(INFO) << "Asked to kill framework " << frameworkId;
+
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    killFramework(framework);
+  }
+}
+
+
+void Slave::schedulerMessage(const FrameworkID& frameworkId,
+                             const FrameworkMessage& message)
+{
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    Executor* executor = framework->getExecutor(message.executor_id());
+    if (executor == NULL) {
+      LOG(WARNING) << "Dropping message for executor '"
+                   << message.executor_id() << "' of framework " << frameworkId
+                   << " because executor does not exist";
+    } else if (!executor->pid) {
+      // TODO(*): If executor is not started, queue framework message?
+      // (It's probably okay to just drop it since frameworks can have
+      // the executor send a message to the master to say when it's ready.)
+      LOG(WARNING) << "Dropping message for executor '"
+                   << message.executor_id() << "' of framework " << frameworkId
+                   << " because executor is not running";
+    } else {
+      MSG<S2E_FRAMEWORK_MESSAGE> out;
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      out.mutable_message()->MergeFrom(message);
+      send(executor->pid, out);
+    }
+  } else {
+    LOG(WARNING) << "Dropping message for framework "<< frameworkId
+                 << " because it does not exist";
+  }
+}
+
+
+void Slave::updateFramework(const FrameworkID& frameworkId,
+                            const string& pid)
+{
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    LOG(INFO) << "Updating framework " << frameworkId
+              << " pid to " <<pid;
+    framework->pid = pid;
+  }
+}
+
+
+void Slave::statusUpdateAck(const FrameworkID& frameworkId,
+                            const SlaveID& slaveId,
+                            const TaskID& taskId)
+{
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    foreachpair (double deadline, _, framework->statuses) {
+      if (framework->statuses[deadline].count(taskId) > 0) {
+        LOG(INFO) << "Got acknowledgement of status update"
+                  << " for task " << taskId
+                  << " of framework " << framework->frameworkId;
+        framework->statuses[deadline].erase(taskId);
         break;
       }
+    }
+  }
+}
 
-      case PROCESS_TIMEOUT: {
-        // Check and see if we should re-send any status updates.
-        foreachpair (_, Framework* framework, frameworks) {
-          foreachpair (double deadline, _, framework->statuses) {
-            if (deadline <= elapsed()) {
-              foreachpair (_, const TaskStatus& status, framework->statuses[deadline]) {
-                LOG(WARNING) << "Resending status update"
-                             << " for task " << status.task_id()
-                             << " of framework " << framework->frameworkId;
-                MSG<S2M_STATUS_UPDATE> out;
-                out.mutable_framework_id()->MergeFrom(framework->frameworkId);
-                out.mutable_status()->MergeFrom(status);
-                send(master, out);
-              }
-            }
-          }
-        }
-        break;
+
+void Slave::registerExecutor(const FrameworkID& frameworkId,
+                             const ExecutorID& executorId)
+{
+  LOG(INFO) << "Got registration for executor '" << executorId
+            << "' of framework " << frameworkId;
+
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    Executor* executor = framework->getExecutor(executorId);
+
+    // Check the status of the executor.
+    if (executor == NULL) {
+      LOG(WARNING) << "Not expecting executor '" << executorId
+                   << "' of framework " << frameworkId;
+      send(from(), S2E_KILL_EXECUTOR);
+    } else if (executor->pid != UPID()) {
+      LOG(WARNING) << "Not good, executor '" << executorId
+                   << "' of framework " << frameworkId
+                   << " is already running";
+      send(from(), S2E_KILL_EXECUTOR);
+    } else {
+      // Save the pid for the executor.
+      executor->pid = from();
+
+      // Now that the executor is up, set its resource limits.
+      isolationModule->resourcesChanged(framework, executor);
+
+      // Tell executor it's registered and give it any queued tasks.
+      MSG<S2E_REGISTER_REPLY> out;
+      ExecutorArgs* args = out.mutable_args();
+      args->mutable_framework_id()->MergeFrom(framework->frameworkId);
+      args->set_name(framework->info.name());
+      args->mutable_slave_id()->MergeFrom(slaveId);
+      args->set_hostname(slave.hostname());
+      args->set_data(framework->info.executor().data());
+      send(executor->pid, out);
+      sendQueuedTasks(framework, executor);
+    }
+  } else {
+    // Framework is gone; tell the executor to exit.
+    LOG(WARNING) << "Framework " << frameworkId
+                 << " does not exist (it may have been killed),"
+                 << " telling executor to exit";
+
+    // TODO(benh): Don't we also want to tell the isolation
+    // module to shut this guy down!
+    send(from(), S2E_KILL_EXECUTOR);
+  }
+}
+
+
+void Slave::statusUpdate(const FrameworkID& frameworkId,
+                         const TaskStatus& status)
+{
+  LOG(INFO) << "Status update: task " << status.task_id()
+            << " of framework " << frameworkId
+            << " is now in state "
+            << TaskState_descriptor()->FindValueByNumber(status.state())->name();
+
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    Executor* executor = framework->getExecutor(status.task_id());
+    if (executor != NULL) {
+      if (status.state() == TASK_FINISHED ||
+          status.state() == TASK_FAILED ||
+          status.state() == TASK_KILLED ||
+          status.state() == TASK_LOST) {
+        executor->removeTask(status.task_id());
+        isolationModule->resourcesChanged(framework, executor);
       }
 
-      default: {
-        LOG(ERROR) << "Received unknown message (" << msgid()
-                   << ") from " << from();
-        break;
+      // Send message and record the status for possible resending.
+      MSG<S2M_STATUS_UPDATE> out;
+      out.mutable_framework_id()->MergeFrom(frameworkId);
+      out.mutable_status()->MergeFrom(status);
+      send(master, out);
+
+      double deadline = elapsed() + STATUS_UPDATE_RETRY_TIMEOUT;
+      framework->statuses[deadline][status.task_id()] = status;
+    } else {
+      LOG(WARNING) << "Status update error: couldn't lookup "
+                   << "executor for framework " << frameworkId;
+    }
+  } else {
+    LOG(WARNING) << "Status update error: couldn't lookup "
+                 << "framework " << frameworkId;
+  }
+}
+
+
+void Slave::executorMessage(const FrameworkID& frameworkId,
+                            const FrameworkMessage& message)
+{
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    LOG(INFO) << "Sending message for framework "
+              << framework->frameworkId
+              << " to " << framework->pid;
+
+    // TODO(benh): This is weird, sending an M2F message.
+    MSG<M2F_FRAMEWORK_MESSAGE> out;
+    out.mutable_framework_id()->MergeFrom(frameworkId);
+    out.mutable_message()->MergeFrom(message);
+    out.mutable_message()->mutable_slave_id()->MergeFrom(slaveId);
+    send(framework->pid, out);
+  }
+}
+
+
+void Slave::timeout()
+{
+  // Check and see if we should re-send any status updates.
+  foreachpair (_, Framework* framework, frameworks) {
+    foreachpair (double deadline, _, framework->statuses) {
+      if (deadline <= elapsed()) {
+        foreachpair (_, const TaskStatus& status, framework->statuses[deadline]) {
+          LOG(WARNING) << "Resending status update"
+                       << " for task " << status.task_id()
+                       << " of framework " << framework->frameworkId;
+          MSG<S2M_STATUS_UPDATE> out;
+          out.mutable_framework_id()->MergeFrom(framework->frameworkId);
+          out.mutable_status()->MergeFrom(status);
+          send(master, out);
+        }
       }
     }
   }
 }
 
+void Slave::exited()
+{
+  LOG(INFO) << "Process exited: " << from();
+
+  if (from() == master) {
+    LOG(WARNING) << "Master disconnected! "
+                 << "Waiting for a new master to be elected.";
+    // TODO(benh): After so long waiting for a master, commit suicide.
+  }
+}
+
+
+
 
 Framework* Slave::getFramework(const FrameworkID& frameworkId)
 {

Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Sun Jun  5 09:18:22 2011
@@ -192,16 +192,13 @@ protected:
     link(slave);
     link(master);
     do {
-      switch (receive(interval)) {
-        case PROCESS_TIMEOUT: {
-          MSG<SH2M_HEARTBEAT> msg;
-          msg.mutable_slave_id()->MergeFrom(slaveId);
-          send(master, msg);
-          break;
-        }
-        case PROCESS_EXIT:
-        default:
-          return;
+      serve(interval);
+      if (name() == process::TIMEOUT) {
+        MSG<SH2M_HEARTBEAT> msg;
+        msg.mutable_slave_id()->MergeFrom(slaveId);
+        send(master, msg);
+      } else {
+        return;
       }
     } while (true);
   }
@@ -239,6 +236,34 @@ public:
 
   const Configuration& getConfiguration();
 
+  void newMasterDetected(const std::string& pid);
+  void noMasterDetected();
+  void masterDetectionFailure();
+  void registerReply(const SlaveID& slaveId, double heartbeat_interval);
+  void reregisterReply(const SlaveID& slaveId, double heartbeat_interval);
+  void runTask(const FrameworkInfo& frameworkInfo,
+               const FrameworkID& frameworkId,
+               const std::string& pid,
+               const TaskDescription& task);
+  void killTask(const FrameworkID& frameworkId,
+                const TaskID& taskId);
+  void killFramework(const FrameworkID& frameworkId);
+  void schedulerMessage(const FrameworkID& frameworkId,
+                        const FrameworkMessage& message);
+  void updateFramework(const FrameworkID& frameworkId,
+                       const std::string& pid);
+  void statusUpdateAck(const FrameworkID& frameworkId,
+                       const SlaveID& slaveId,
+                       const TaskID& taskId);
+  void registerExecutor(const FrameworkID& frameworkId,
+                        const ExecutorID& executorId);
+  void statusUpdate(const FrameworkID& frameworkId,
+                    const TaskStatus& status);
+  void executorMessage(const FrameworkID& frameworkId,
+                       const FrameworkMessage& message);
+  void timeout();
+  void exited();
+
   // TODO(...): Don't make these instance variables public! Hack for
   // now because they are needed in the isolation modules.
   bool local;
@@ -247,6 +272,8 @@ public:
 protected:
   virtual void operator () ();
 
+  void initialize();
+
   Framework* getFramework(const FrameworkID& frameworkId);
 
   // Send any tasks queued up for the given framework to its executor
@@ -256,6 +283,8 @@ protected:
 private:
   Configuration conf;
 
+  SlaveInfo slave;
+
   process::UPID master;
   Resources resources;
 

Modified: incubator/mesos/trunk/src/tests/master_test.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/master_test.cpp?rev=1132277&r1=1132276&r2=1132277&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/master_test.cpp (original)
+++ incubator/mesos/trunk/src/tests/master_test.cpp Sun Jun  5 09:18:22 2011
@@ -494,7 +494,7 @@ TEST(MasterTest, SlavePartitioned)
   EXPECT_CALL(sched, slaveLost(&driver, _))
     .WillOnce(Trigger(&slaveLostCall));
 
-  EXPECT_MSG(filter, Eq(names[SH2M_HEARTBEAT]), _, _)
+  EXPECT_MSG(filter, Eq(SH2M_HEARTBEAT), _, _)
     .WillRepeatedly(Return(true));
 
   driver.start();
@@ -778,7 +778,7 @@ TEST(MasterTest, SchedulerFailoverStatus
   EXPECT_CALL(sched1, error(&driver1, _, "Framework failover"))
     .Times(1);
 
-  EXPECT_MSG(filter, Eq(names[M2F_STATUS_UPDATE]), _, Ne(master))
+  EXPECT_MSG(filter, Eq(M2F_STATUS_UPDATE), _, Ne(master))
     .WillOnce(DoAll(Trigger(&statusUpdateMsg), Return(true)))
     .RetiresOnSaturation();