You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2013/04/30 03:31:54 UTC

svn commit: r1477444 - in /incubator/mesos/trunk/src: master/master.cpp master/master.hpp tests/fault_tolerance_tests.cpp

Author: bmahler
Date: Tue Apr 30 01:31:53 2013
New Revision: 1477444

URL: http://svn.apache.org/r1477444
Log:
Fixed the master to shutdown deactivated slaves that send further
messages to the master.

Review: https://reviews.apache.org/r/10734

Modified:
    incubator/mesos/trunk/src/master/master.cpp
    incubator/mesos/trunk/src/master/master.hpp
    incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp

Modified: incubator/mesos/trunk/src/master/master.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.cpp?rev=1477444&r1=1477443&r2=1477444&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.cpp (original)
+++ incubator/mesos/trunk/src/master/master.cpp Tue Apr 30 01:31:53 2013
@@ -346,13 +346,6 @@ void Master::initialize()
       &StatusUpdateMessage::update,
       &StatusUpdateMessage::pid);
 
-  install<ExecutorToFrameworkMessage>(
-      &Master::executorMessage,
-      &ExecutorToFrameworkMessage::slave_id,
-      &ExecutorToFrameworkMessage::framework_id,
-      &ExecutorToFrameworkMessage::executor_id,
-      &ExecutorToFrameworkMessage::data);
-
   install<ExitedExecutorMessage>(
       &Master::exitedExecutor,
       &ExitedExecutorMessage::slave_id,
@@ -1049,92 +1042,68 @@ void Master::statusUpdate(const StatusUp
 {
   const TaskStatus& status = update.status();
 
-  LOG(INFO) << "Status update from " << from
+  // NOTE: We cannot use 'from' here to identify the slave as this is
+  // now sent by the StatusUpdateManagerProcess. Only 'pid' can
+  // be used to identify the slave.
+  LOG(INFO) << "Status update from " << pid
             << ": task " << status.task_id()
             << " of framework " << update.framework_id()
             << " is now in state " << status.state();
 
   Slave* slave = getSlave(update.slave_id());
-  if (slave != NULL) {
-    Framework* framework = getFramework(update.framework_id());
-    if (framework != NULL) {
-      // Pass on the (transformed) status update to the framework.
-      StatusUpdateMessage message;
-      message.mutable_update()->MergeFrom(update);
-      message.set_pid(pid);
-      send(framework->pid, message);
+  if (slave == NULL) {
+    if (deactivatedSlaves.contains(pid)) {
+      // If the slave is deactivated, we have already informed
+      // frameworks that its tasks were LOST, so the slave should
+      // shut down.
+      LOG(WARNING) << "Ignoring status update from deactivated slave " << pid
+                   << " with id " << update.slave_id() << " ; asking slave "
+                   << " to shutdown";
+      send(pid, ShutdownMessage());
+    } else {
+      LOG(WARNING) << "Ignoring status update from unknown slave " << pid
+                   << " with id " << update.slave_id();
+    }
+    stats.invalidStatusUpdates++;
+    return;
+  }
 
-      // Lookup the task and see if we need to update anything locally.
-      Task* task = slave->getTask(update.framework_id(), status.task_id());
-      if (task != NULL) {
-        task->set_state(status.state());
-
-        // Handle the task appropriately if it's terminated.
-        if (status.state() == TASK_FINISHED ||
-            status.state() == TASK_FAILED ||
-            status.state() == TASK_KILLED ||
-            status.state() == TASK_LOST) {
+  CHECK(!deactivatedSlaves.contains(pid));
 
-          removeTask(task);
-        }
+  Framework* framework = getFramework(update.framework_id());
+  if (framework == NULL) {
+    LOG(WARNING) << "Ignoring status update from " << pid << " ("
+                 << slave->info.hostname() << "): error, couldn't lookup "
+                 << "framework " << update.framework_id();
+    stats.invalidStatusUpdates++;
+    return;
+  }
 
-        stats.tasks[status.state()]++;
+  // Pass on the (transformed) status update to the framework.
+  StatusUpdateMessage message;
+  message.mutable_update()->MergeFrom(update);
+  message.set_pid(pid);
+  send(framework->pid, message);
 
-        stats.validStatusUpdates++;
-      } else {
-        LOG(WARNING) << "Status update from " << from << " ("
-                     << slave->info.hostname() << "): error, couldn't lookup "
-                     << "task " << status.task_id();
-        stats.invalidStatusUpdates++;
-      }
-    } else {
-      LOG(WARNING) << "Status update from " << from << " ("
-                   << slave->info.hostname() << "): error, couldn't lookup "
-                   << "framework " << update.framework_id();
-      stats.invalidStatusUpdates++;
-    }
-  } else {
-    LOG(WARNING) << "Status update from " << from
-                 << ": error, couldn't lookup slave "
-                 << update.slave_id();
+  // Lookup the task and see if we need to update anything locally.
+  Task* task = slave->getTask(update.framework_id(), status.task_id());
+  if (task == NULL) {
+    LOG(WARNING) << "Status update from " << pid << " ("
+                 << slave->info.hostname() << "): error, couldn't lookup "
+                 << "task " << status.task_id();
     stats.invalidStatusUpdates++;
+    return;
   }
-}
 
+  task->set_state(status.state());
 
-void Master::executorMessage(const SlaveID& slaveId,
-                             const FrameworkID& frameworkId,
-                             const ExecutorID& executorId,
-                             const string& data)
-{
-  Slave* slave = getSlave(slaveId);
-  if (slave != NULL) {
-    Framework* framework = getFramework(frameworkId);
-    if (framework != NULL) {
-      LOG(INFO) << "Sending framework message from slave " << slaveId << " ("
-                << slave->info.hostname() << ")"
-                << " to framework " << frameworkId;
-      ExecutorToFrameworkMessage message;
-      message.mutable_slave_id()->MergeFrom(slaveId);
-      message.mutable_framework_id()->MergeFrom(frameworkId);
-      message.mutable_executor_id()->MergeFrom(executorId);
-      message.set_data(data);
-      send(framework->pid, message);
-
-      stats.validFrameworkMessages++;
-    } else {
-      LOG(WARNING) << "Cannot send framework message from slave "
-                   << slaveId << " (" << slave->info.hostname()
-                   << ") to framework " << frameworkId
-                   << " because framework does not exist";
-      stats.invalidFrameworkMessages++;
-    }
-  } else {
-    LOG(WARNING) << "Cannot send framework message from slave "
-                 << slaveId << " to framework " << frameworkId
-                 << " because slave does not exist";
-    stats.invalidFrameworkMessages++;
+  // Handle the task appropriately if it's terminated.
+  if (protobuf::isTerminalState(status.state())) {
+    removeTask(task);
   }
+
+  stats.tasks[status.state()]++;
+  stats.validStatusUpdates++;
 }
 
 
@@ -1146,41 +1115,55 @@ void Master::exitedExecutor(const SlaveI
   // Only update master's internal data structures here for properly accounting.
   // The TASK_LOST updates are handled by the slave.
   Slave* slave = getSlave(slaveId);
-  if (slave != NULL) {
-    // Tell the allocator about the recovered resources.
-    if (slave->hasExecutor(frameworkId, executorId)) {
-      ExecutorInfo executor = slave->executors[frameworkId][executorId];
-
-      LOG(INFO) << "Executor " << executorId
-                << " of framework " << frameworkId
-                << " on slave " << slaveId
-                << " (" << slave->info.hostname() << ")"
-                << " exited with status " << status;
-
-      allocator->resourcesRecovered(frameworkId,
-                                    slaveId,
-                                    Resources(executor.resources()));
-
-      // Remove executor from slave and framework.
-      slave->removeExecutor(frameworkId, executorId);
+  if (slave == NULL) {
+    if (deactivatedSlaves.contains(from)) {
+      // If the slave is deactivated, we have already informed
+      // frameworks that its tasks were LOST, so the slave should
+      // shut down.
+      LOG(WARNING) << "Ignoring exited executor '" << executorId
+                   << "' of framework " << frameworkId
+                   << " on deactivated slave " << slaveId
+                   << " ; asking slave to shutdown";
+      reply(ShutdownMessage());
     } else {
-      LOG(WARNING) << "Ignoring unknown exited executor "
-                   << executorId << " on slave " << slaveId
-                   << " (" << slave->info.hostname() << ")";
+      LOG(WARNING) << "Ignoring exited executor '" << executorId
+                   << "' of framework " << frameworkId
+                   << " on unknown slave " << slaveId;
     }
+    return;
+  }
 
-    Framework* framework = getFramework(frameworkId);
-    if (framework != NULL) {
-      framework->removeExecutor(slave->id, executorId);
+  CHECK(!deactivatedSlaves.contains(from));
 
-      // TODO(benh): Send the framework it's executor's exit status?
-      // Or maybe at least have something like
-      // Scheduler::executorLost?
-    }
-  } else {
-    LOG(INFO) << "Ignoring unknown exited executor " << executorId
+  // Tell the allocator about the recovered resources.
+  if (slave->hasExecutor(frameworkId, executorId)) {
+    ExecutorInfo executor = slave->executors[frameworkId][executorId];
+
+    LOG(INFO) << "Executor " << executorId
               << " of framework " << frameworkId
-              << " on unknown slave " << slaveId;
+              << " on slave " << slaveId
+              << " (" << slave->info.hostname() << ")"
+              << " exited with status " << status;
+
+    allocator->resourcesRecovered(frameworkId,
+        slaveId,
+        Resources(executor.resources()));
+
+    // Remove executor from slave and framework.
+    slave->removeExecutor(frameworkId, executorId);
+  } else {
+    LOG(WARNING) << "Ignoring unknown exited executor "
+                 << executorId << " on slave " << slaveId
+                 << " (" << slave->info.hostname() << ")";
+  }
+
+  Framework* framework = getFramework(frameworkId);
+  if (framework != NULL) {
+    framework->removeExecutor(slave->id, executorId);
+
+    // TODO(benh): Send the framework its executor's exit status?
+    // Or maybe at least have something like
+    // Scheduler::executorLost?
   }
 }
 

Modified: incubator/mesos/trunk/src/master/master.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/master/master.hpp?rev=1477444&r1=1477443&r2=1477444&view=diff
==============================================================================
--- incubator/mesos/trunk/src/master/master.hpp (original)
+++ incubator/mesos/trunk/src/master/master.hpp Tue Apr 30 01:31:53 2013
@@ -107,10 +107,6 @@ public:
                        const std::vector<Task>& tasks);
   void unregisterSlave(const SlaveID& slaveId);
   void statusUpdate(const StatusUpdate& update, const UPID& pid);
-  void executorMessage(const SlaveID& slaveId,
-                       const FrameworkID& frameworkId,
-                       const ExecutorID& executorId,
-                       const std::string& data);
   void exitedExecutor(const SlaveID& slaveId,
                       const FrameworkID& frameworkId,
                       const ExecutorID& executorId,

Modified: incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp?rev=1477444&r1=1477443&r2=1477444&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/fault_tolerance_tests.cpp Tue Apr 30 01:31:53 2013
@@ -25,13 +25,17 @@
 #include <vector>
 
 #include <mesos/executor.hpp>
+#include <mesos/mesos.hpp>
 #include <mesos/scheduler.hpp>
 
 #include <process/future.hpp>
 #include <process/gmock.hpp>
+#include <process/pid.hpp>
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
 
+#include <stout/stringify.hpp>
+
 #include "common/protobuf_utils.hpp"
 
 #include "detector/detector.hpp"
@@ -42,6 +46,7 @@
 #include "master/hierarchical_allocator_process.hpp"
 #include "master/master.hpp"
 
+#include "slave/isolator.hpp"
 #include "slave/slave.hpp"
 
 #include "tests/utils.hpp"
@@ -55,6 +60,7 @@ using mesos::internal::master::Allocator
 using mesos::internal::master::HierarchicalDRFAllocatorProcess;
 using mesos::internal::master::Master;
 
+using mesos::internal::slave::Isolator;
 using mesos::internal::slave::Slave;
 using mesos::internal::slave::STATUS_UPDATE_RETRY_INTERVAL;
 
@@ -62,6 +68,7 @@ using process::Clock;
 using process::Future;
 using process::Message;
 using process::PID;
+using process::UPID;
 
 using std::string;
 using std::map;
@@ -220,10 +227,6 @@ TEST_F(FaultToleranceClusterTest, Partit
   Try<PID<Master> > master = cluster.masters.start();
   ASSERT_SOME(master);
 
-  MockExecutor exec;
-  Try<PID<Slave> > slave = cluster.slaves.start(DEFAULT_EXECUTOR_ID, &exec);
-  ASSERT_SOME(slave);
-
   // Allow the master to PING the slave, but drop all PONG messages
   // from the slave. Note that we don't match on the master / slave
   // PIDs because it's actually the SlaveObserver Process that sends
@@ -231,7 +234,9 @@ TEST_F(FaultToleranceClusterTest, Partit
   Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
   DROP_MESSAGES(Eq("PONG"), _, _);
 
-  BasicMasterDetector detector(master.get(), slave.get(), true);
+  MockExecutor exec;
+  Try<PID<Slave> > slave = cluster.slaves.start(DEFAULT_EXECUTOR_ID, &exec);
+  ASSERT_SOME(slave);
 
   MockScheduler sched;
   MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
@@ -287,7 +292,7 @@ TEST_F(FaultToleranceClusterTest, Partit
 
   // Drop the first shutdown message from the master (simulated
   // partition), allow the second shutdown message to pass when
-  // the slave re-registers,
+  // the slave re-registers.
   Future<ShutdownMessage> shutdownSlave =
     DROP_PROTOBUF(ShutdownMessage(), _, slave.get());
 
@@ -312,6 +317,7 @@ TEST_F(FaultToleranceClusterTest, Partit
     }
     ping = FUTURE_MESSAGE(Eq("PING"), _, _);
     Clock::advance(master::SLAVE_PING_TIMEOUT);
+    Clock::settle();
   }
 
   Clock::advance(master::SLAVE_PING_TIMEOUT);
@@ -357,6 +363,259 @@ TEST_F(FaultToleranceClusterTest, Partit
 }
 
 
+// The purpose of this test is to ensure that when slaves are removed
+// from the master, and then attempt to send status updates, we send
+// a ShutdownMessage to the slave. Why? Because during a network
+// partition, the master will remove a partitioned slave, thus sending
+// its tasks to LOST. At this point, when the partition is removed,
+// the slave may attempt to send updates if it was unaware that the
+// master deactivated it. We've already notified frameworks that these
+// tasks were LOST, so we have to have the slave shut down.
+TEST_F(FaultToleranceClusterTest, PartitionedSlaveStatusUpdates)
+{
+  Try<PID<Master> > master = cluster.masters.start();
+  ASSERT_SOME(master);
+
+  // Allow the master to PING the slave, but drop all PONG messages
+  // from the slave. Note that we don't match on the master / slave
+  // PIDs because it's actually the SlaveObserver Process that sends
+  // the pings.
+  Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+  DROP_MESSAGES(Eq("PONG"), _, _);
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  MockExecutor exec;
+  Try<PID<Slave> > slave = cluster.slaves.start(DEFAULT_EXECUTOR_ID, &exec);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+  SlaveID slaveId = slaveRegisteredMessage.get().slave_id();
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(Return());
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  // Drop the first shutdown message from the master (simulated
+  // partition), allow the second shutdown message to pass when
+  // the slave sends an update.
+  Future<ShutdownMessage> shutdownSlave =
+    DROP_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .WillRepeatedly(Return());
+
+  Future<Nothing> slaveLost;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost));
+
+  Clock::pause();
+
+  // Now, induce a partition of the slave by having the master
+  // timeout the slave.
+  uint32_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+     break;
+    }
+    ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+    Clock::advance(master::SLAVE_PING_TIMEOUT);
+    Clock::settle();
+  }
+
+  Clock::advance(master::SLAVE_PING_TIMEOUT);
+  Clock::settle();
+
+  // Wait for the master to attempt to shut down the slave.
+  AWAIT_READY(shutdownSlave);
+
+  // The master will notify the framework that the slave was lost.
+  AWAIT_READY(slaveLost);
+
+  shutdownSlave = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+  // At this point, the slave still thinks it's registered, so we
+  // simulate a status update coming from the slave.
+  StatusUpdateMessage statusUpdate;
+  statusUpdate.set_pid(stringify(slave.get()));
+  statusUpdate.mutable_update()->mutable_framework_id()->set_value(
+      frameworkId.get().value());
+  statusUpdate.mutable_update()->mutable_executor_id()->set_value("executor");
+  statusUpdate.mutable_update()->mutable_slave_id()->set_value(slaveId.value());
+  statusUpdate.mutable_update()->mutable_status()->mutable_task_id()->set_value(
+      "task_id");
+  statusUpdate.mutable_update()->mutable_status()->set_state(TASK_RUNNING);
+  statusUpdate.mutable_update()->set_timestamp(Clock::now());
+  statusUpdate.mutable_update()->set_uuid(stringify(UUID::random()));
+  process::post(master.get(), statusUpdate);
+
+  // The master should shutdown the slave upon receiving the update.
+  AWAIT_READY(shutdownSlave);
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+
+  cluster.shutdown();
+}
+
+
+// The purpose of this test is to ensure that when slaves are removed
+// from the master, and then attempt to send exited executor messages,
+// we send a ShutdownMessage to the slave. Why? Because during a
+// network partition, the master will remove a partitioned slave, thus
+// sending its tasks to LOST. At this point, when the partition is
+// removed, the slave may attempt to send exited executor messages if
+// it was unaware that the master deactivated it. We've already
+// notified frameworks that the tasks under the executors were LOST,
+// so we have to have the slave shut down.
+TEST_F(FaultToleranceClusterTest, PartitionedSlaveExitedExecutor)
+{
+  Try<PID<Master> > master = cluster.masters.start();
+  ASSERT_SOME(master);
+
+  // Allow the master to PING the slave, but drop all PONG messages
+  // from the slave. Note that we don't match on the master / slave
+  // PIDs because it's actually the SlaveObserver Process that sends
+  // the pings.
+  Future<Message> ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+  DROP_MESSAGES(Eq("PONG"), _, _);
+
+  MockExecutor exec;
+  TestingIsolator* isolator = new TestingIsolator(DEFAULT_EXECUTOR_ID, &exec);
+  process::spawn(isolator);
+  Try<PID<Slave> > slave = cluster.slaves.start(isolator);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));\
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+  AWAIT_READY(offers);
+  ASSERT_NE(0u, offers.get().size());
+
+  // Launch a task. This allows us to have the slave send an
+  // ExitedExecutorMessage.
+  TaskID taskId;
+  taskId.set_value("1");
+
+  TaskInfo task;
+  task.set_name("");
+  task.mutable_task_id()->MergeFrom(taskId);
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+  task.mutable_executor()->mutable_command()->set_value("sleep 60");
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  // Set up the expectations for launching the task.
+  EXPECT_CALL(exec, registered(_, _, _, _));
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  // Drop all the status updates from the slave, so that we can
+  // ensure the ExitedExecutorMessage is what triggers the slave
+  // shutdown.
+  DROP_PROTOBUFS(StatusUpdateMessage(), _, master.get());
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  // Drop the first shutdown message from the master (simulated
+  // partition) and allow the second shutdown message to pass when
+  // triggered by the ExitedExecutorMessage.
+  Future<ShutdownMessage> shutdownSlave =
+    DROP_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+  Future<TaskStatus> lostStatus;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&lostStatus));
+
+  Future<Nothing> slaveLost;
+  EXPECT_CALL(sched, slaveLost(&driver, _))
+    .WillOnce(FutureSatisfy(&slaveLost));
+
+  Clock::pause();
+
+  // Now, induce a partition of the slave by having the master
+  // timeout the slave.
+  uint32_t pings = 0;
+  while (true) {
+    AWAIT_READY(ping);
+    pings++;
+    if (pings == master::MAX_SLAVE_PING_TIMEOUTS) {
+     break;
+    }
+    ping = FUTURE_MESSAGE(Eq("PING"), _, _);
+    Clock::advance(master::SLAVE_PING_TIMEOUT);
+    Clock::settle();
+  }
+
+  Clock::advance(master::SLAVE_PING_TIMEOUT);
+  Clock::settle();
+
+  // The master will have notified the framework of the lost task.
+  AWAIT_READY(lostStatus);
+  EXPECT_EQ(TASK_LOST, lostStatus.get().state());
+
+  // Wait for the master to attempt to shut down the slave.
+  AWAIT_READY(shutdownSlave);
+
+  // The master will notify the framework that the slave was lost.
+  AWAIT_READY(slaveLost);
+
+  shutdownSlave = FUTURE_PROTOBUF(ShutdownMessage(), _, slave.get());
+
+  // Induce an ExitedExecutorMessage from the slave.
+  dispatch(isolator,
+           &Isolator::killExecutor,
+           frameworkId.get(),
+           DEFAULT_EXECUTOR_INFO.executor_id());
+
+  // Upon receiving the message, the master will shutdown the slave.
+  AWAIT_READY(shutdownSlave);
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+
+  cluster.shutdown();
+
+  // TODO(benh): Terminate and wait for the isolator once the slave
+  // is no longer doing so.
+  // process::terminate(isolator);
+  // process::wait(isolator);
+  delete isolator;
+}
+
+
 TEST_F(FaultToleranceTest, SchedulerFailover)
 {
   ASSERT_TRUE(GTEST_IS_THREADSAFE);