You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/04/17 09:09:06 UTC

svn commit: r1468775 - in /incubator/mesos/trunk/src/slave: cgroups_isolator.hpp process_isolator.hpp slave.cpp slave.hpp

Author: vinodkone
Date: Wed Apr 17 07:09:05 2013
New Revision: 1468775

URL: http://svn.apache.org/r1468775
Log:
Added states to Slave struct.

Review: https://reviews.apache.org/r/10268

Modified:
    incubator/mesos/trunk/src/slave/cgroups_isolator.hpp
    incubator/mesos/trunk/src/slave/process_isolator.hpp
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/slave/slave.hpp

Modified: incubator/mesos/trunk/src/slave/cgroups_isolator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/cgroups_isolator.hpp?rev=1468775&r1=1468774&r2=1468775&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/cgroups_isolator.hpp (original)
+++ incubator/mesos/trunk/src/slave/cgroups_isolator.hpp Wed Apr 17 07:09:05 2013
@@ -48,12 +48,6 @@
 namespace mesos {
 namespace internal {
 namespace slave {
-namespace state {
-
-class State; // Forward declaration.
-
-} // namespace state {
-
 
 // TODO(bmahler): Migrate this into it's own file, along with moving
 // all cgroups code inside of a 'cgroups' directory.

Modified: incubator/mesos/trunk/src/slave/process_isolator.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_isolator.hpp?rev=1468775&r1=1468774&r2=1468775&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_isolator.hpp (original)
+++ incubator/mesos/trunk/src/slave/process_isolator.hpp Wed Apr 17 07:09:05 2013
@@ -40,12 +40,6 @@
 namespace mesos {
 namespace internal {
 namespace slave {
-namespace state {
-
-class State; // Forward declaration.
-
-} // namespace state {
-
 
 class ProcessIsolator : public Isolator, public ProcessExitedListener
 {

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1468775&r1=1468774&r2=1468775&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Wed Apr 17 07:09:05 2013
@@ -79,6 +79,7 @@ Slave::Slave(const Resources& _resources
              Isolator* _isolator,
              Files* _files)
   : ProcessBase(ID::generate("slave")),
+    state(RECOVERING),
     flags(),
     local(_local),
     resources(_resources),
@@ -95,6 +96,7 @@ Slave::Slave(const slave::Flags& _flags,
              Isolator* _isolator,
              Files* _files)
   : ProcessBase(ID::generate("slave")),
+    state(RECOVERING),
     flags(_flags),
     local(_local),
     completedFrameworks(MAX_COMPLETED_FRAMEWORKS),
@@ -272,10 +274,6 @@ void Slave::initialize()
 
   startTime = Clock::now();
 
-  connected = false;
-
-  halting = false;
-
   // Install protobuf handlers.
   install<NewMasterDetectedMessage>(
       &Slave::newMasterDetected,
@@ -427,6 +425,12 @@ void Slave::_initialize(const Future<Not
   // in 'cleanup' mode.
   if (frameworks.empty() && flags.recover == "cleanup") {
     terminate(self());
+  } else {
+    // Register with the master.
+    state = DISCONNECTED;
+    if (master) {
+      doReliableRegistration();
+    }
   }
 }
 
@@ -452,7 +456,8 @@ void Slave::finalize()
     }
   }
 
-  if (flags.checkpoint && (halting || flags.recover == "cleanup")) {
+  if (flags.checkpoint &&
+      (state == TERMINATING || flags.recover == "cleanup")) {
     // We remove the "latest" symlink in meta directory, so that the
     // slave doesn't recover the state when it restarts and registers
     // as a new slave with the master.
@@ -479,7 +484,7 @@ void Slave::shutdown()
 
   LOG(INFO) << "Slave asked to shut down by " << from;
 
-  halting = true;
+  state = TERMINATING;
 
   if (frameworks.empty()) { // Terminate slave if there are no frameworks.
     terminate(self());
@@ -525,71 +530,132 @@ void Slave::newMasterDetected(const UPID
   master = pid;
   link(master);
 
-  connected = false;
+  // Inform the status updates manager about the new master.
+  statusUpdateManager->newMasterDetected(master);
 
-  // Do registration after recovery is complete.
-  // NOTE: Slave only registers with master when it is in "reconnect" mode.
-  // This ensures that master doesn't offer resources of a slave in "cleanup"
-  // mode.
-  if (flags.recover == "reconnect") {
-    recovered.future()
-      .onReady(defer(self(), &Self::doReliableRegistration, params::_1));
-  } else {
-    LOG(INFO)
-      << "Skipping registration because slave is started in 'cleanup' mode";
+  if (flags.recover == "cleanup") {
+    LOG(INFO) << "Skipping registration because slave is in 'cleanup' mode";
+    return;
   }
 
-  // Inform the status updates manager about the new master.
-  statusUpdateManager->newMasterDetected(master);
+  switch (state) {
+    case RECOVERING:
+      LOG(INFO) << "Postponing registration until recovery is complete";
+      break;
+    case DISCONNECTED:
+    case RUNNING:
+      state = DISCONNECTED;
+      doReliableRegistration();
+      break;
+    case TERMINATING:
+      LOG(INFO) << "Skipping registration because slave is terminating";
+      break;
+    default:
+      LOG(FATAL) << "Unexpected slave state " << state;
+      break;
+  }
 }
 
 
 void Slave::noMasterDetected()
 {
   LOG(INFO) << "Lost master(s) ... waiting";
-  connected = false;
   master = UPID();
+
+  CHECK(state == RECOVERING || state == DISCONNECTED ||
+        state == RUNNING || state == TERMINATING)
+    << state;
+
+  // We only change state if the slave is in RUNNING state because
+  // if the slave is in:
+  // RECOVERY: Slave needs to finish recovery before changing states.
+  // DISCONNECTED: Redundant.
+  // TERMINATING: Slave is shutting down.
+  // TODO(vinod): Subscribe to master detector after recovery.
+  // Similarly, unsubscribe from master detector during termination.
+  // Currently it is tricky because master detector is injected into
+  // the slave from outside.
+  if (state == RUNNING) {
+    state = DISCONNECTED;
+  }
 }
 
 
 void Slave::registered(const SlaveID& slaveId)
 {
-  LOG(INFO) << "Registered with master; given slave ID " << slaveId;
-  info.mutable_id()->CopyFrom(slaveId); // Store the slave id.
-  connected = true;
-
-  if (flags.checkpoint) {
-    // Create the slave meta directory.
-    paths::createSlaveDirectory(paths::getMetaRootDir(flags.work_dir), slaveId);
-
-    // Checkpoint slave info.
-    const string& path = paths::getSlaveInfoPath(
-        paths::getMetaRootDir(flags.work_dir), slaveId);
+  switch(state) {
+    case DISCONNECTED: {
+      LOG(INFO) << "Registered with master " << master
+                << "; given slave ID " << slaveId;
+
+      state = RUNNING;
+      info.mutable_id()->CopyFrom(slaveId); // Store the slave id.
+
+      if (flags.checkpoint) {
+        // Create the slave meta directory.
+        paths::createSlaveDirectory(paths::getMetaRootDir(flags.work_dir), slaveId);
+
+        // Checkpoint slave info.
+        const string& path = paths::getSlaveInfoPath(
+            paths::getMetaRootDir(flags.work_dir), slaveId);
 
-    CHECK_SOME(state::checkpoint(path, info));
+        CHECK_SOME(state::checkpoint(path, info));
+      }
+      break;
+    }
+    case RUNNING:
+      // Already registered. Ignore registration.
+      break;
+    case TERMINATING:
+      LOG(WARNING) << "Ignoring registration because slave is terminating";
+      break;
+    case RECOVERING:
+    default:
+      LOG(FATAL) << "Unexpected slave state " << state;
+      break;
   }
 }
 
 
 void Slave::reregistered(const SlaveID& slaveId)
 {
-  LOG(INFO) << "Re-registered with master";
-
-  if (!(info.id() == slaveId)) {
-    LOG(FATAL) << "Slave re-registered but got wrong ID";
+  switch(state) {
+    case DISCONNECTED:
+      LOG(INFO) << "Re-registered with master " << master;
+
+      state = RUNNING;
+      if (!(info.id() == slaveId)) {
+        LOG(FATAL) << "Slave re-registered but got wrong id: " << slaveId
+                   << "(expected: " << info.id() << ")";
+      }
+      break;
+    case RUNNING:
+      // Already registered. Ignore registration.
+      break;
+    case TERMINATING:
+      LOG(WARNING) << "Ignoring re-registration because slave is terminating";
+      break;
+    case RECOVERING:
+    default:
+      LOG(FATAL) << "Unexpected slave state " << state;
+      break;
   }
-  connected = true;
 }
 
 
-void Slave::doReliableRegistration(const Future<Nothing>& future)
+void Slave::doReliableRegistration()
 {
-  CHECK(future.isReady());
+  if (!master) {
+    LOG(INFO) << "Skipping registration because no master present";
+    return;
+  }
 
-  if (connected || !master) {
+  if (state == RUNNING) { // Slave (re-)registered with the master.
     return;
   }
 
+  CHECK(state == DISCONNECTED || state == TERMINATING) << state;
+
   if (info.id() == "") {
     // Slave started before master.
     // (Vinod): Is the above comment true?
@@ -628,7 +694,7 @@ void Slave::doReliableRegistration(const
   }
 
   // Retry registration if necessary.
-  delay(Seconds(1.0), self(), &Slave::doReliableRegistration, future);
+  delay(Seconds(1.0), self(), &Slave::doReliableRegistration);
 }
 
 
@@ -643,6 +709,26 @@ void Slave::runTask(
   LOG(INFO) << "Got assigned task " << task.task_id()
             << " for framework " << frameworkId;
 
+  CHECK(state == RECOVERING || state == DISCONNECTED ||
+        state == RUNNING || state == TERMINATING)
+    << state;
+
+  if (state != RUNNING) {
+    LOG(WARNING) << "Cannot run task " << task.task_id()
+                 << " of framework " << frameworkId
+                 << " because the slave is in " << state << " state";
+
+    const StatusUpdate& update = protobuf::createStatusUpdate(
+        frameworkId,
+        info.id(),
+        task.task_id(),
+        TASK_LOST,
+        "Slave is not in RUNNING state");
+
+    statusUpdate(update);
+    return;
+  }
+
   // TODO(vinod): Do this check in the master instead.
   if (frameworkInfo.checkpoint() && !flags.checkpoint) {
      LOG(WARNING) << "Asked to checkpoint framework " << frameworkId
@@ -741,7 +827,7 @@ void Slave::runTask(
       statusUpdate(update);
       break;
     }
-    case Executor::REGISTERING: {
+    case Executor::REGISTERING:
       // Checkpoint the task before we do anything else (this is a no-op
       // if the framework doesn't have checkpointing enabled).
       executor->checkpointTask(task);
@@ -755,7 +841,6 @@ void Slave::runTask(
 
       executor->queuedTasks[task.task_id()] = task;
       break;
-    }
     case Executor::RUNNING: {
       // Checkpoint the task before we do anything else (this is a no-op
       // if the framework doesn't have checkpointing enabled).
@@ -801,6 +886,26 @@ void Slave::killTask(const FrameworkID& 
   LOG(INFO) << "Asked to kill task " << taskId
             << " of framework " << frameworkId;
 
+  CHECK(state == RECOVERING || state == DISCONNECTED ||
+        state == RUNNING || state == TERMINATING)
+    << state;
+
+  if (state != RUNNING) {
+    LOG(WARNING) << "Cannot kill task " << taskId
+                 << " of framework " << frameworkId
+                 << " because the slave is in " << state << " state";
+
+    const StatusUpdate& update = protobuf::createStatusUpdate(
+        frameworkId,
+        info.id(),
+        taskId,
+        TASK_LOST,
+        "Slave is not in RUNNING state");
+
+    statusUpdate(update);
+    return;
+  }
+
   Framework* framework = getFramework(frameworkId);
   if (framework == NULL) {
     LOG(WARNING) << "Ignoring kill task " << taskId
@@ -866,13 +971,12 @@ void Slave::killTask(const FrameworkID& 
       break;
     }
     case Executor::TERMINATING:
-    case Executor::TERMINATED: {
+    case Executor::TERMINATED:
       LOG(WARNING) << "Ignoring kill task " << taskId
                    << " of framework " << frameworkId
                    << " because the executor '" << executor->id
                    << "' is terminating/terminated";
       break;
-    }
     case Executor::RUNNING: {
       // Send a message to the executor and wait for
       // it to send us a status update.
@@ -902,13 +1006,24 @@ void Slave::shutdownFramework(const Fram
   // its called directly (e.g. Slave::finalize()) or
   // its a message from the currently registered master.
   if (from && from != master) {
-    LOG(WARNING) << "Ignoring shutdown framework message from " << from
-                 << " because it is not from the registered master ("
-                 << master << ")";
+    LOG(WARNING) << "Ignoring shutdown framework message for " << frameworkId
+                 << " from " << from << "because it is not from the registered "
+                 << "master (" << master << ")";
     return;
   }
 
-  LOG(INFO) << "Asked to shut down framework " << frameworkId << " by " << from;
+  LOG(INFO) << "Asked to shut down framework " << frameworkId
+            << " by " << from;
+
+  CHECK(state == RECOVERING || state == DISCONNECTED ||
+        state == RUNNING || state == TERMINATING)
+    << state;
+
+  if (state == RECOVERING || state == DISCONNECTED) {
+    LOG(WARNING) << "Ignoring shutdown framework message for " << frameworkId
+                 << " because the slave has not yet registered with the master";
+    return;
+  }
 
   Framework* framework = getFramework(frameworkId);
   if (framework == NULL) {
@@ -917,12 +1032,11 @@ void Slave::shutdownFramework(const Fram
   }
 
   switch (framework->state) {
-    case Framework::TERMINATING: {
+    case Framework::TERMINATING:
       LOG(WARNING) << "Ignoring shutdown framework " << framework->id
                    << " because it is terminating";
       break;
-    }
-    case Framework::RUNNING: {
+    case Framework::RUNNING:
       LOG(INFO) << "Shutting down framework " << framework->id;
 
       framework->state = Framework::TERMINATING;
@@ -948,7 +1062,6 @@ void Slave::shutdownFramework(const Fram
         }
       }
       break;
-    }
     default:
       LOG(FATAL) << "Framework " << frameworkId
                  << " is in unexpected state " << framework->state;
@@ -963,9 +1076,21 @@ void Slave::schedulerMessage(
     const ExecutorID& executorId,
     const string& data)
 {
+  CHECK(state == RECOVERING || state == DISCONNECTED ||
+        state == RUNNING || state == TERMINATING)
+    << state;
+
+  if (state != RUNNING) {
+    LOG(WARNING) << "Dropping message from framework "<< frameworkId
+                 << " because the slave is in " << state << " state";
+    stats.invalidFrameworkMessages++;
+    return;
+  }
+
+
   Framework* framework = getFramework(frameworkId);
   if (framework == NULL) {
-    LOG(WARNING) << "Dropping message for framework "<< frameworkId
+    LOG(WARNING) << "Dropping message from framework "<< frameworkId
                  << " because framework does not exist";
     stats.invalidFrameworkMessages++;
     return;
@@ -976,7 +1101,7 @@ void Slave::schedulerMessage(
     << framework->state;
 
   if (framework->state == Framework::TERMINATING) {
-    LOG(WARNING) << "Dropping message for framework "<< frameworkId
+    LOG(WARNING) << "Dropping message from framework "<< frameworkId
                  << " because framework is terminating";
     stats.invalidFrameworkMessages++;
     return;
@@ -994,7 +1119,7 @@ void Slave::schedulerMessage(
   switch (executor->state) {
     case Executor::REGISTERING:
     case Executor::TERMINATING:
-    case Executor::TERMINATED: {
+    case Executor::TERMINATED:
       // TODO(*): If executor is not yet registered, queue framework
       // message? It's probably okay to just drop it since frameworks
       // can have the executor send a message to the master to say when
@@ -1004,7 +1129,6 @@ void Slave::schedulerMessage(
                    << " because executor is not running";
       stats.invalidFrameworkMessages++;
       break;
-    }
     case Executor::RUNNING: {
       FrameworkToExecutorMessage message;
       message.mutable_slave_id()->MergeFrom(slaveId);
@@ -1026,6 +1150,17 @@ void Slave::schedulerMessage(
 
 void Slave::updateFramework(const FrameworkID& frameworkId, const string& pid)
 {
+  CHECK(state == RECOVERING || state == DISCONNECTED ||
+        state == RUNNING || state == TERMINATING)
+    << state;
+
+  if (state != RUNNING) {
+    LOG(WARNING) << "Dropping updateFramework message for "<< frameworkId
+                 << " because the slave is in " << state << " state";
+    stats.invalidFrameworkMessages++;
+    return;
+  }
+
   Framework* framework = getFramework(frameworkId);
   if (framework == NULL) {
     LOG(WARNING) << "Ignoring updating pid for framework " << frameworkId
@@ -1034,11 +1169,10 @@ void Slave::updateFramework(const Framew
   }
 
   switch (framework->state) {
-    case Framework::TERMINATING: {
+    case Framework::TERMINATING:
       LOG(WARNING) << "Ignoring updating pid for framework " << frameworkId
                    << " because it is terminating";
       break;
-    }
     case Framework::RUNNING: {
       LOG(INFO) << "Updating framework " << frameworkId << " pid to " << pid;
 
@@ -1109,6 +1243,10 @@ void Slave::_statusUpdateAcknowledgement
             << " acknowledgement for task " << taskId
             << " of framework " << frameworkId;
 
+  CHECK(state == RECOVERING || state == DISCONNECTED ||
+        state == RUNNING || state == TERMINATING)
+    << state;
+
   Framework* framework = getFramework(frameworkId);
   if (framework == NULL) {
     LOG(ERROR) << "Status update acknowledgement for task " << taskId
@@ -1151,6 +1289,26 @@ void Slave::registerExecutor(
   LOG(INFO) << "Got registration for executor '" << executorId
             << "' of framework " << frameworkId;
 
+  CHECK(state == RECOVERING || state == DISCONNECTED ||
+        state == RUNNING || state == TERMINATING)
+    << state;
+
+  if (state == RECOVERING) {
+    LOG(WARNING) << "Shutting down executor '" << executorId
+                 << "' of framework " << frameworkId
+                 << " because the slave is still recovering";
+    reply(ShutdownExecutorMessage());
+    return;
+  }
+
+  if (state == TERMINATING) {
+    LOG(WARNING) << "Shutting down executor '" << executorId
+                 << "' of framework " << frameworkId
+                 << " because the slave is terminating";
+    reply(ShutdownExecutorMessage());
+    return;
+  }
+
   Framework* framework = getFramework(frameworkId);
   if (framework == NULL) {
     LOG(WARNING) << " Shutting down executor '" << executorId
@@ -1189,13 +1347,12 @@ void Slave::registerExecutor(
     case Executor::TERMINATED:
       // TERMINATED is possible if the executor forks, the parent process
       // terminates and the child process (driver) tries to register!
-    case Executor::RUNNING: {
+    case Executor::RUNNING:
       LOG(WARNING) << "Shutting down executor '" << executorId
                    << "' of framework " << frameworkId
                    << " because it is in unexpected state " << executor->state;
       reply(ShutdownExecutorMessage());
       break;
-    }
     case Executor::REGISTERING: {
       executor->state = Executor::RUNNING;
 
@@ -1274,6 +1431,18 @@ void Slave::reregisterExecutor(
     const vector<TaskInfo>& tasks,
     const vector<StatusUpdate>& updates)
 {
+  CHECK(state == RECOVERING || state == DISCONNECTED ||
+        state == RUNNING || state == TERMINATING)
+    << state;
+
+  if (state != RECOVERING) {
+    LOG(WARNING) << "Shutting down executor '" << executorId
+                 << "' of framework " << frameworkId
+                 << " because the slave is not in recovery mode";
+    reply(ShutdownExecutorMessage());
+    return;
+  }
+
   LOG(INFO) << "Re-registering executor " << executorId
             << " of framework " << frameworkId;
 
@@ -1301,13 +1470,12 @@ void Slave::reregisterExecutor(
     case Executor::TERMINATED:
       // TERMINATED is possible if the executor forks, the parent process
       // terminates and the child process (driver) tries to register!
-    case Executor::RUNNING: {
+    case Executor::RUNNING:
       LOG(WARNING) << "Shutting down executor '" << executorId
                    << "' of framework " << frameworkId
                    << " because it is in unexpected state " << executor->state;
       reply(ShutdownExecutorMessage());
       break;
-    }
     case Executor::REGISTERING: {
       executor->state = Executor::RUNNING;
 
@@ -1368,6 +1536,8 @@ void Slave::reregisterExecutor(
 
 void Slave::reregisterExecutorTimeout()
 {
+  CHECK(state == RECOVERING || state == TERMINATING) << state;
+
   LOG(INFO) << "Cleaning up un-reregistered executors";
 
   foreachvalue (Framework* framework, frameworks) {
@@ -1379,10 +1549,9 @@ void Slave::reregisterExecutorTimeout()
       switch (executor->state) {
         case Executor::RUNNING:     // Executor re-registered.
         case Executor::TERMINATING:
-        case Executor::TERMINATED: {
+        case Executor::TERMINATED:
           break;
-        }
-        case Executor::REGISTERING: {
+        case Executor::REGISTERING:
           // If we are here, the executor must have been hung and not
           // exited! This is because if the executor properly exited,
           // it should have already been identified by the isolator
@@ -1395,7 +1564,6 @@ void Slave::reregisterExecutorTimeout()
           dispatch(
               isolator, &Isolator::killExecutor, framework->id, executor->id);
           break;
-        }
         default:
           LOG(FATAL) << "Executor '" << executor->id
                      << "' of framework " << framework->id
@@ -1415,6 +1583,10 @@ void Slave::reregisterExecutorTimeout()
 // 2) When slave generates task updates (e.g LOST/KILLED/FAILED).
 void Slave::statusUpdate(const StatusUpdate& update)
 {
+  CHECK(state == RECOVERING || state == DISCONNECTED ||
+        state == RUNNING || state == TERMINATING)
+    << state;
+
   const TaskStatus& status = update.status();
 
   Framework* framework = getFramework(update.framework_id());
@@ -1535,10 +1707,22 @@ void Slave::executorMessage(
     const ExecutorID& executorId,
     const string& data)
 {
+  CHECK(state == RECOVERING || state == DISCONNECTED ||
+        state == RUNNING || state == TERMINATING)
+    << state;
+
+  if (state != RUNNING) {
+    LOG(WARNING) << "Dropping framework message from executor "
+                 << executorId << " to framework " << frameworkId
+                 << " because the slave is in " << state << " state";
+    stats.invalidFrameworkMessages++;
+    return;
+  }
+
   Framework* framework = getFramework(frameworkId);
   if (framework == NULL) {
-    LOG(WARNING) << "Cannot send framework message from slave "
-                 << slaveId << " to framework " << frameworkId
+    LOG(WARNING) << "Cannot send framework message from executor "
+                 << executorId << " to framework " << frameworkId
                  << " because framework does not exist";
     stats.invalidFrameworkMessages++;
     return;
@@ -1549,8 +1733,8 @@ void Slave::executorMessage(
     << framework->state;
 
   if (framework->state == Framework::TERMINATING) {
-    LOG(WARNING) << "Ignoring framework message from slave "
-                 << slaveId << " to framework " << frameworkId
+    LOG(WARNING) << "Ignoring framework message from executor "
+                 << executorId << " to framework " << frameworkId
                  << " because framework is terminating";
     stats.invalidFrameworkMessages++;
     return;
@@ -1640,14 +1824,13 @@ void Slave::executorStarted(
   }
 
   switch (executor->state) {
-    case Executor::TERMINATING: {
+    case Executor::TERMINATING:
       LOG(WARNING) << "Executor '" << executorId
                    << "' of framework " << frameworkId
                    << " is terminating";
       break;
-    }
     case Executor::REGISTERING:
-    case Executor::RUNNING: {
+    case Executor::RUNNING:
       monitor.watch(
           frameworkId,
           executorId,
@@ -1655,7 +1838,6 @@ void Slave::executorStarted(
           flags.resource_monitoring_interval)
         .onAny(lambda::bind(_watch, lambda::_1, frameworkId, executorId));
       break;
-    }
     case Executor::TERMINATED:
     default:
       LOG(FATAL) << " Executor '" << executorId
@@ -1921,7 +2103,7 @@ void Slave::remove(Framework* framework)
     // TODO(vinod): Instead of doing it this way, shutdownFramework()
     // and shutdownExecutor() could return Futures and a slave could
     // shutdown when all the Futures are satisfied (e.g., collect()).
-    if (halting ||
+    if (state == TERMINATING ||
         (flags.recover == "cleanup" && !recovered.future().isPending()) ) {
       terminate(self());
     }
@@ -2010,19 +2192,17 @@ void Slave::shutdownExecutorTimeout(
   }
 
   switch (executor->state) {
-    case Executor::TERMINATED: {
+    case Executor::TERMINATED:
       LOG(INFO) << "Executor '" << executorId
                 << "' of framework " << frameworkId
                 << " has already terminated";
       break;
-    }
-    case Executor::TERMINATING: {
+    case Executor::TERMINATING:
       LOG(INFO) << "Killing executor '" << executor->id
                 << "' of framework " << framework->id;
 
       dispatch(isolator, &Isolator::killExecutor, framework->id, executor->id);
       break;
-    }
     default:
       LOG(FATAL) << "Executor '" << executor->id
                  << "' of framework " << framework->id
@@ -2074,18 +2254,16 @@ void Slave::registerExecutorTimeout(
   }
 
   switch (executor->state) {
-    case Executor::RUNNING: {
+    case Executor::RUNNING:
       // Executor has registered. Ignore the registration timeout.
       break;
-    }
     case Executor::TERMINATING:
-    case Executor::TERMINATED: {
+    case Executor::TERMINATED:
       LOG(INFO) << "Ignoring registration timeout for executor '" << executorId
                 << "' of framework " << frameworkId
                 << " because the executor is terminating/terminated";
       break;
-    }
-    case Executor::REGISTERING: {
+    case Executor::REGISTERING:
       LOG(INFO) << "Terminating executor " << executor->id
                 << " of framework " << framework->id
                 << " because it did not register within "
@@ -2096,7 +2274,6 @@ void Slave::registerExecutorTimeout(
       // Immediately kill the executor.
       dispatch(isolator, &Isolator::killExecutor, framework->id, executor->id);
       break;
-    }
     default:
       LOG(FATAL) << "Executor '" << executor->id
                  << "' of framework " << framework->id
@@ -2321,7 +2498,7 @@ Framework::Framework(
     pid(_pid),
     completedExecutors(MAX_COMPLETED_EXECUTORS_PER_FRAMEWORK)
 {
-  if (info.checkpoint()) {
+  if (info.checkpoint() && slave->state != slave->RECOVERING) {
     // Checkpoint the framework info.
     string path = paths::getFrameworkInfoPath(
         paths::getMetaRootDir(slave->flags.work_dir),
@@ -2557,9 +2734,8 @@ Executor::Executor(
     resources(_info.resources()),
     completedTasks(MAX_COMPLETED_TASKS_PER_EXECUTOR)
 {
-  if (checkpoint) {
-    CHECK_NOTNULL(slave);
-
+  CHECK_NOTNULL(slave);
+  if (checkpoint && slave->state != slave->RECOVERING) {
     // Checkpoint the executor info.
     const string& path = paths::getExecutorInfoPath(
         paths::getMetaRootDir(slave->flags.work_dir),
@@ -2709,6 +2885,17 @@ std::ostream& operator << (std::ostream&
   }
 }
 
+
+std::ostream& operator << (std::ostream& stream, Slave::State state) {
+  switch (state) {
+    case Slave::RECOVERING:   return stream << "RECOVERING";
+    case Slave::DISCONNECTED: return stream << "DISCONNECTED";
+    case Slave::RUNNING:      return stream << "RUNNING";
+    case Slave::TERMINATING:  return stream << "TERMINATING";
+    default:                  return stream << "UNKNOWN";
+  }
+}
+
 } // namespace slave {
 } // namespace internal {
 } // namespace mesos {

Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1468775&r1=1468774&r2=1468775&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Wed Apr 17 07:09:05 2013
@@ -91,7 +91,7 @@ public:
   void masterDetectionFailure();
   void registered(const SlaveID& slaveId);
   void reregistered(const SlaveID& slaveId);
-  void doReliableRegistration(const Future<Nothing>& future);
+  void doReliableRegistration();
 
   void runTask(
       const FrameworkInfo& frameworkInfo,
@@ -180,6 +180,13 @@ public:
   // exited.
   void shutdownExecutor(Framework* framework, Executor* executor);
 
+  enum State {
+    RECOVERING,   // Slave is doing recovery.
+    DISCONNECTED, // Slave is not connected to the master.
+    RUNNING,      // Slave has (re-)registered.
+    TERMINATING,  // Slave is shutting down.
+  } state;
+
 protected:
   virtual void initialize();
   virtual void finalize();
@@ -285,24 +292,15 @@ private:
 
   double startTime;
 
-  // TODO(Vinod): Add 'state' to slave instead of capturing the
-  // semantics of waiting for registration ('connecting') and
-  // shutting down ('halting') in boolean variables.
-  bool connected; // Flag to indicate if slave is registered.
-
   GarbageCollector gc;
   ResourceMonitor monitor;
 
-  state::SlaveState state;
-
   StatusUpdateManager* statusUpdateManager;
 
   // Flag to indicate if recovery, including reconciling (i.e., reconnect/kill)
   // with executors is finished.
   Promise<Nothing> recovered;
 
-  bool halting; // Flag to indicate if the slave is shutting down.
-
   // Root meta directory containing checkpointed data.
   const std::string metaDir;
 };