You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/04/17 09:09:02 UTC

svn commit: r1468774 - in /incubator/mesos/trunk/src: slave/process_isolator.cpp slave/slave.cpp slave/slave.hpp slave/state.cpp tests/slave_recovery_tests.cpp

Author: vinodkone
Date: Wed Apr 17 07:09:01 2013
New Revision: 1468774

URL: http://svn.apache.org/r1468774
Log:
Fixed the slave to wait for all executors to exit and delete the
"latest" slave symlink when shutting down.

Review: https://reviews.apache.org/r/10233

Modified:
    incubator/mesos/trunk/src/slave/process_isolator.cpp
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/slave/slave.hpp
    incubator/mesos/trunk/src/slave/state.cpp
    incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp

Modified: incubator/mesos/trunk/src/slave/process_isolator.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/process_isolator.cpp?rev=1468774&r1=1468773&r2=1468774&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/process_isolator.cpp (original)
+++ incubator/mesos/trunk/src/slave/process_isolator.cpp Wed Apr 17 07:09:01 2013
@@ -455,8 +455,8 @@ void ProcessIsolator::processExited(pid_
       ProcessInfo* info = infos[frameworkId][executorId];
 
       if (info->pid.isSome() && info->pid.get() == pid) {
-        LOG(INFO) << "Telling slave of lost executor " << executorId
-                  << " of framework " << frameworkId;
+        LOG(INFO) << "Telling slave of terminated executor '" << executorId
+                  << "' of framework " << frameworkId;
 
         dispatch(slave,
                  &Slave::executorTerminated,

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1468774&r1=1468773&r2=1468774&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Wed Apr 17 07:09:01 2013
@@ -421,8 +421,13 @@ void Slave::_initialize(const Future<Not
     }
   }
 
-  // Signal recovery.
-  recovered.set(Nothing());
+  recovered.set(Nothing()); // Signal recovery.
+
+  // Terminate slave, if it has no active frameworks and is started
+  // in 'cleanup' mode.
+  if (frameworks.empty() && flags.recover == "cleanup") {
+    terminate(self());
+  }
 }
 
 
@@ -439,16 +444,22 @@ void Slave::finalize()
     // immediately). Of course, this still isn't sufficient
     // because those status updates might get lost and we won't
     // resend them unless we build that into the system.
-    // NOTE: We shut down the framework if either
-    // 1: The slave is asked to shutdown (halting = true) or
-    // 2: The framework has disabled checkpointing.
-    if (halting || !frameworks[frameworkId]->info.checkpoint()) {
+    // NOTE: We shut down the framework only if it has disabled
+    // checkpointing. This is because slave recovery tests terminate
+    // the slave to simulate slave restart.
+    if (!frameworks[frameworkId]->info.checkpoint()) {
       shutdownFramework(frameworkId);
     }
   }
 
+  if (flags.checkpoint && (halting || flags.recover == "cleanup")) {
+    // We remove the "latest" symlink in meta directory, so that the
+    // slave doesn't recover the state when it restarts and registers
+    // as a new slave with the master.
+    CHECK_SOME(os::rm(paths::getLatestSlavePath(metaDir)));
+  }
+
   // Stop the isolator.
-  // TODO(vinod): Wait until all the executors have terminated.
   terminate(isolator);
   wait(isolator);
 }
@@ -470,7 +481,21 @@ void Slave::shutdown()
 
   halting = true;
 
-  terminate(self());
+  if (frameworks.empty()) { // Terminate slave if there are no frameworks.
+    terminate(self());
+  } else {
+    // NOTE: The slave will terminate after all
+    // executors have terminated.
+    // TODO(vinod): Wait until all updates have been acknowledged.
+    // This is tricky without persistent state at master because the
+    // slave might wait forever for status update acknowledgements,
+    // since it cannot reliably know when a framework has shut down.
+    // A short-term fix could be to wait for a certain time for ACKs
+    // and then shutdown.
+    foreachkey (const FrameworkID& frameworkId, utils::copy(frameworks)) {
+      shutdownFramework(frameworkId);
+    }
+  }
 }
 
 
@@ -903,15 +928,25 @@ void Slave::shutdownFramework(const Fram
       framework->state = Framework::TERMINATING;
 
       // Shut down all executors of this framework.
-      // Note that the framework and its corresponding executors are
-      // removed from the frameworks map by shutdownExecutorTimeout() or
-      // executorTerminated().
-      foreachvalue (Executor* executor, framework->executors) {
-        shutdownExecutor(framework, executor);
+      foreachvalue (Executor* executor, utils::copy(framework->executors)) {
+        CHECK(executor->state == Executor::REGISTERING ||
+              executor->state == Executor::RUNNING ||
+              executor->state == Executor::TERMINATING ||
+              executor->state == Executor::TERMINATED)
+          << executor->state;
+
+        if (executor->state == Executor::REGISTERING ||
+            executor->state == Executor::RUNNING) {
+          shutdownExecutor(framework, executor);
+        } else if (executor->state == Executor::TERMINATED) {
+          // NOTE: We call remove here to ensure we can remove an
+          // executor (of a terminating framework) that is terminated
+          // but waiting for acknowledgements.
+          remove(framework, executor);
+        } else {
+          // Executor is terminating. Ignore.
+        }
       }
-
-      // Close all status update streams for this framework.
-      statusUpdateManager->cleanup(frameworkId);
       break;
     }
     default:
@@ -1101,8 +1136,11 @@ void Slave::_statusUpdateAcknowledgement
 
   executor->updates.remove(taskId, uuid);
 
-  // Cleanup the executor and framework, if possible.
-  cleanup(framework, executor);
+  // Remove the executor if it has terminated and there are no more
+  // pending updates.
+  if (executor->state == Executor::TERMINATED && executor->updates.empty()) {
+    remove(framework, executor);
+  }
 }
 
 
@@ -1464,7 +1502,7 @@ void Slave::_statusUpdate(
     const Option<UPID>& pid)
 {
   if (!future.isReady()) {
-    LOG(FATAL) << "Failed to handle status update " << update
+    LOG(FATAL) << "Failed to handle status update " << update << ": "
                << (future.isFailed() ? future.failure() : "future discarded");
     return;
   }
@@ -1541,11 +1579,11 @@ void Slave::ping(const UPID& from, const
 
 void Slave::exited(const UPID& pid)
 {
-  LOG(INFO) << "Process exited: " << from;
+  LOG(INFO) << pid << " exited";
 
   if (master == pid) {
-    LOG(WARNING) << "WARNING! Master disconnected!"
-                 << " Waiting for a new master to be elected.";
+    LOG(WARNING) << "Master disconnected!"
+                 << " Waiting for a new master to be elected";
     // TODO(benh): After so long waiting for a master, commit suicide.
   }
 }
@@ -1659,10 +1697,11 @@ void Slave::executorTerminated(
             << "' of framework " << frameworkId
             << (WIFEXITED(status)
                 ? " has exited with status "
-                : " has terminated with signal ")
+                : " has terminated with signal '")
             << (WIFEXITED(status)
                 ? stringify(WEXITSTATUS(status))
-                : strsignal(WTERMSIG(status)));
+                : strsignal(WTERMSIG(status)))
+            << "'";
 
   Framework* framework = getFramework(frameworkId);
   if (framework == NULL) {
@@ -1694,6 +1733,8 @@ void Slave::executorTerminated(
       monitor.unwatch(frameworkId, executorId)
         .onAny(lambda::bind(_unwatch, lambda::_1, frameworkId, executorId));
 
+      // TODO(vinod): If there are no pending tasks or if the framework
+      // is terminating, this variable will not be properly set.
       bool isCommandExecutor = false;
 
       // Transition all live tasks to TASK_LOST/TASK_FAILED.
@@ -1770,8 +1811,12 @@ void Slave::executorTerminated(
         send(master, message);
       }
 
-      // Cleanup the executor and framework, if possible.
-      cleanup(framework, executor);
+      // Remove the executor if either there are no pending updates
+      // or the framework is terminating.
+      if (executor->updates.empty() ||
+          framework->state == Framework::TERMINATING) {
+        remove(framework, executor);
+      }
       break;
     }
     default:
@@ -1783,109 +1828,104 @@ void Slave::executorTerminated(
 }
 
 
-void Slave::cleanup(Framework* framework, Executor* executor)
+void Slave::remove(Framework* framework, Executor* executor)
 {
   CHECK_NOTNULL(framework);
   CHECK_NOTNULL(executor);
 
-  // Cleanup this executor if it has terminated and either has no
+  LOG(INFO) << "Cleaning up executor '" << executor->id << "'"
+            << " of framework " << framework->id;
+
+  CHECK(framework->state == Framework::RUNNING ||
+        framework->state == Framework::TERMINATING);
+
+  // Check that this executor has terminated and either has no
   // pending updates or the framework is terminating. We don't
   // care for pending updates when a framework is terminating
   // because the framework cannot ACK them.
-  // TODO(vinod): Use switch statement.
-  if (executor->state == Executor::TERMINATED &&
-      (executor->updates.empty() ||
-       framework->state == Framework::TERMINATING)) {
-
-    // Schedule the executor run work directory to get garbage collected.
-    // TODO(vinod): Also schedule the top level executor work directory.
-    gc.schedule(flags.gc_delay, executor->directory)
-      .onAny(defer(self(), &Self::detachFile, params::_1, executor->directory));
-
-    if (executor->checkpoint) {
-      // Schedule the executor run meta directory to get garbage collected.
-      // TODO(vinod): Also schedule the top level executor meta directory.
-      const string& executorMetaDir = paths::getExecutorRunPath(
-          metaDir,
-          info.id(),
-          framework->id,
-          executor->id,
-          executor->uuid);
+  CHECK(executor->state == Executor::TERMINATED &&
+        (executor->updates.empty() ||
+         framework->state == Framework::TERMINATING));
+
+  // Schedule the executor run work directory to get garbage collected.
+  // TODO(vinod): Also schedule the top level executor work directory.
+  gc.schedule(flags.gc_delay, executor->directory).onAny(
+      defer(self(), &Self::detachFile, params::_1, executor->directory));
 
-      gc.schedule(flags.gc_delay, executorMetaDir)
-        .onAny(defer(self(), &Self::detachFile, params::_1, executorMetaDir));
-    }
+  if (executor->checkpoint) {
+    // Schedule the executor run meta directory to get garbage collected.
+    // TODO(vinod): Also schedule the top level executor meta directory.
+    const string& executorMetaDir = paths::getExecutorRunPath(
+        metaDir,
+        info.id(),
+        framework->id,
+        executor->id,
+        executor->uuid);
 
-    framework->destroyExecutor(executor->id);
+    gc.schedule(flags.gc_delay, executorMetaDir).onAny(
+        defer(self(), &Self::detachFile, params::_1, executorMetaDir));
   }
 
-  // Cleanup if this framework has no executors running.
-  if (framework->executors.empty()) {
-    // Schedule the framework work directory to get garbage collected.
-    const string& frameworkDir =
-      paths::getFrameworkPath(flags.work_dir, info.id(), framework->id);
+  framework->destroyExecutor(executor->id);
 
-    gc.schedule(flags.gc_delay, frameworkDir)
-      .onAny(defer(self(), &Self::detachFile, params::_1, frameworkDir));
-
-    if (framework->info.checkpoint()) {
-      // Schedule the framework meta directory to get garbage collected.
-      const string& frameworkMetaDir = paths::getFrameworkPath(
-          metaDir, info.id(), framework->id);
-
-      gc.schedule(flags.gc_delay, frameworkMetaDir)
-        .onAny(defer(self(), &Self::detachFile, params::_1, frameworkMetaDir));
-    }
-
-    frameworks.erase(framework->id);
-
-    // Pass ownership of the framework pointer.
-    completedFrameworks.push_back(Owned<Framework>(framework));
-  }
-
-  // If this slave is in 'recover=cleanup' mode, exit after all
-  // executors have been removed.
-  if (flags.recover == "cleanup" && frameworks.size() == 0) {
-    cleanup();
+  // Remove this framework if it has no executors running.
+  if (framework->executors.empty()) {
+    remove(framework);
   }
 }
 
 
-void Slave::cleanup()
+void Slave::remove(Framework* framework)
 {
-  CHECK(flags.recover == "cleanup");
+  CHECK_NOTNULL(framework);
 
-  LOG(INFO) << "Slave is shutting down because it is started with "
-            << " --recover==cleanup and all executors have terminated!";
+  LOG(INFO) << "Cleaning up framework " << framework->id;
 
-  string archiveDir = paths::getArchiveDir(flags.work_dir);
-  string metaDir = paths::getMetaRootDir(flags.work_dir);
+  CHECK(framework->state == Framework::RUNNING ||
+        framework->state == Framework::TERMINATING);
 
-  // Archive and delete the meta directory, to allow incompatible upgrades.
-  LOG(INFO) << "Archiving and deleting the meta directory '"
-            << metaDir << "' to allow incompatible upgrade!";
+  CHECK(framework->executors.empty());
 
-  // Create the archive directory, if it doesn't exist.
-  Try<Nothing> result = os::mkdir(archiveDir);
-  if (result.isSome()) {
-    result = os::tar(
-        metaDir, path::join(archiveDir, info.id().value() + ".tar.gz"));
+  // Close all status update streams for this framework.
+  statusUpdateManager->cleanup(framework->id);
 
-    if (result.isError()) {
-      LOG(ERROR) << "Failed to archive meta directory '"
-                 << archiveDir << "': " << result.error();
+  // Schedule the framework work directory to get garbage collected.
+  const string& frameworkDir = paths::getFrameworkPath(
+      flags.work_dir,
+      info.id(),
+      framework->id);
+
+  gc.schedule(flags.gc_delay, frameworkDir).onAny(
+      defer(self(), &Self::detachFile, params::_1, frameworkDir));
+
+  if (framework->info.checkpoint()) {
+    // Schedule the framework meta directory to get garbage collected.
+    const string& frameworkMetaDir = paths::getFrameworkPath(
+        metaDir,
+        info.id(),
+        framework->id);
+
+    gc.schedule(flags.gc_delay, frameworkMetaDir).onAny(
+        defer(self(), &Self::detachFile, params::_1, frameworkMetaDir));
+  }
+
+  frameworks.erase(framework->id);
+
+  // Pass ownership of the framework pointer.
+  completedFrameworks.push_back(Owned<Framework>(framework));
+
+  if (frameworks.empty()) {
+    // Terminate the slave if
+    // 1) it's being shut down or
+    // 2) it's started in cleanup mode and recovery finished.
+    // TODO(vinod): Instead of doing it this way, shutdownFramework()
+    // and shutdownExecutor() could return Futures and a slave could
+    // shutdown when all the Futures are satisfied (e.g., collect()).
+    if (halting ||
+        (flags.recover == "cleanup" && !recovered.future().isPending()) ) {
+      terminate(self());
     }
-  } else {
-    LOG(ERROR) << "Failed to create archive directory '"
-               << archiveDir << ": " << result.error();
-  }
-
-  result = os::rmdir(metaDir);
-  if (result.isError()) {
-    LOG(ERROR) << "Failed to delete meta directory '" << metaDir << "'";
   }
-
-  shutdown();
 }
 
 
@@ -1914,37 +1954,24 @@ void Slave::shutdownExecutor(Framework* 
         framework->state == Framework::TERMINATING)
     << framework->state;
 
-  switch (executor->state) {
-    case Executor::TERMINATING:
-    case Executor::TERMINATED: {
-      LOG(WARNING) << "Ignoring shutdown of executor '" << executor->id
-                   << "' of framework " << framework->id
-                   << " because the executor is terminating/terminated";
-      break;
-    }
-    case Executor::REGISTERING:
-    case Executor::RUNNING: {
-      executor->state = Executor::TERMINATING;
 
-      // If the executor hasn't yet registered, this message
-      // will be dropped to the floor!
-      send(executor->pid, ShutdownExecutorMessage());
+  CHECK(executor->state == Executor::REGISTERING ||
+        executor->state == Executor::RUNNING)
+    << executor->state;
 
-      // Prepare for sending a kill if the executor doesn't comply.
-      delay(flags.executor_shutdown_grace_period,
-          self(),
-          &Slave::shutdownExecutorTimeout,
-          framework->id,
-          executor->id,
-          executor->uuid);
-      break;
-    }
-    default:
-      LOG(FATAL) << "Executor '" << executor->id
-                 << "' of framework " << framework->id
-                 << " is in unexpected state " << executor->state;
-      break;
-  }
+  executor->state = Executor::TERMINATING;
+
+  // If the executor hasn't yet registered, this message
+  // will be dropped to the floor!
+  send(executor->pid, ShutdownExecutorMessage());
+
+  // Prepare for sending a kill if the executor doesn't comply.
+  delay(flags.executor_shutdown_grace_period,
+        self(),
+        &Slave::shutdownExecutorTimeout,
+        framework->id,
+        executor->id,
+        executor->uuid);
 }
 
 
@@ -2273,6 +2300,11 @@ void Slave::recoverFramework(const Frame
       }
     }
   }
+
+  // Remove the framework in case we didn't recover any executors.
+  if (framework->executors.empty()) {
+    remove(framework);
+  }
 }
 
 
@@ -2282,7 +2314,7 @@ Framework::Framework(
     const FrameworkID& _id,
     const FrameworkInfo& _info,
     const UPID& _pid)
-  : state(RUNNING), // TODO(benh): Skipping INITIALIZING for now.
+  : state(RUNNING),
     slave(_slave),
     id(_id),
     info(_info),
@@ -2382,7 +2414,11 @@ Executor* Framework::createExecutor(cons
 
   // Create a directory for the executor.
   const string& directory = paths::createExecutorDirectory(
-      slave->flags.work_dir, slave->info.id(), id, executorInfo.executor_id(), uuid);
+      slave->flags.work_dir,
+      slave->info.id(),
+      id,
+      executorInfo.executor_id(),
+      uuid);
 
   Executor* executor = new Executor(
       slave, id, executorInfo, uuid, directory, info.checkpoint());
@@ -2509,7 +2545,7 @@ Executor::Executor(
     const UUID& _uuid,
     const string& _directory,
     bool _checkpoint)
-  : state(REGISTERING), // TODO(benh): Skipping INITIALIZING for now.
+  : state(REGISTERING),
     slave(_slave),
     id(_info.executor_id()),
     info(_info),

Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1468774&r1=1468773&r2=1468774&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Wed Apr 17 07:09:01 2013
@@ -228,15 +228,11 @@ protected:
   // Helper to recover a framework from the specified state.
   void recoverFramework(const state::FrameworkState& state, bool reconnect);
 
-  // Called when an executor terminates or a status update
-  // acknowledgement is handled by the status update manager.
-  // This potentially removes the executor and framework and
-  // schedules them for garbage collection.
-  void cleanup(Framework* framework, Executor* executor);
-
-  // Called when the slave is started in 'cleanup' recovery mode and
-  // all the executors have terminated.
-  void cleanup();
+  // Removes and garbage collects the executor.
+  void remove(Framework* framework, Executor* executor);
+
+  // Removes and garbage collects the framework.
+  void remove(Framework* framework);
 
 private:
   Slave(const Slave&);              // No copying.
@@ -289,6 +285,9 @@ private:
 
   double startTime;
 
+  // TODO(Vinod): Add 'state' to slave instead of capturing the
+  // semantics of waiting for registration ('connecting') and
+  // shutting down ('halting') in boolean variables.
   bool connected; // Flag to indicate if slave is registered.
 
   GarbageCollector gc;

Modified: incubator/mesos/trunk/src/slave/state.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/state.cpp?rev=1468774&r1=1468773&r2=1468774&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/state.cpp (original)
+++ incubator/mesos/trunk/src/slave/state.cpp Wed Apr 17 07:09:01 2013
@@ -36,15 +36,10 @@ Result<SlaveState> recover(const string&
 
   // Check if the "latest" symlink to a slave directory exists.
   if (!os::exists(latest)) {
-    string message = "Failed to find the latest slave from '" + rootDir + "'";
-    if (safe) {
-      return Error(message);
-    } else {
-      // The slave likely died before it registered and had a chance
-      // to create the "latest" symlink.
-      LOG(WARNING) << message;
-      return None();
-    }
+    // The slave was asked to shutdown or died before it registered
+    // and had a chance to create the "latest" symlink.
+    LOG(INFO) << "Failed to find the latest slave from '" << rootDir << "'";
+    return None();
   }
 
   // Get the latest slave id.

Modified: incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp?rev=1468774&r1=1468773&r2=1468774&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/slave_recovery_tests.cpp Wed Apr 17 07:09:01 2013
@@ -1073,3 +1073,93 @@ TYPED_TEST(SlaveRecoveryTest, GCExecutor
   driver.stop();
   driver.join();
 }
+
+
+// The slave is asked to shutdown. When it comes back up, it should
+// register as a new slave.
+TYPED_TEST(SlaveRecoveryTest, ShutdownSlave)
+{
+  // Scheduler expectations.
+  MockScheduler sched;
+  EXPECT_CALL(sched, registered(_, _, _));
+
+  Future<vector<Offer> > offers1;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers1))
+    .WillOnce(Return());       // Ignore the offer when slave is shutting down.
+
+  // Enable checkpointing for the framework.
+  FrameworkInfo frameworkInfo;
+  frameworkInfo.CopyFrom(DEFAULT_FRAMEWORK_INFO);
+  frameworkInfo.set_checkpoint(true);
+
+  MesosSchedulerDriver driver(&sched, frameworkInfo, this->master);
+
+  driver.start();
+
+  AWAIT_READY(offers1);
+
+  EXPECT_NE(0u, offers1.get().size());
+
+  TaskInfo task = createTask(offers1.get()[0], "sleep 1000");
+  vector<TaskInfo> tasks;
+  tasks.push_back(task); // Long-running task.
+
+  Future<Nothing> statusUpdate1;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureSatisfy(&statusUpdate1))
+    .WillOnce(Return());  // Ignore TASK_FAILED update.
+
+  Future<Message> registerExecutor =
+    FUTURE_MESSAGE(Eq(RegisterExecutorMessage().GetTypeName()), _, _);
+
+  driver.launchTasks(offers1.get()[0].id(), tasks);
+
+  // Capture the executor pid.
+  AWAIT_READY(registerExecutor);
+  UPID executorPid = registerExecutor.get().from;
+
+  AWAIT_READY(statusUpdate1); // Wait for TASK_RUNNING update.
+
+  Future<Nothing> executorTerminated =
+    FUTURE_DISPATCH(_, &Slave::executorTerminated);
+
+  // We shut down the executor here so that a shutting down slave
+  // does not spend too much time waiting for the executor to exit.
+  process::post(executorPid, ShutdownExecutorMessage());
+
+  Clock::pause();
+
+  // Now advance time until the reaper reaps the executor.
+  while (executorTerminated.isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_READY(executorTerminated);
+
+  Clock::resume();
+
+  // Shut down the slave.
+  this->stopSlave(true);
+
+  Future<vector<Offer> > offers2;
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(FutureArg<1>(&offers2))
+    .WillRepeatedly(Return());        // Ignore subsequent offers.
+
+  // Now restart the slave.
+  this->startSlave();
+
+  // Ensure that the slave registered with a new id.
+  AWAIT_UNTIL(offers2);
+
+  EXPECT_NE(0u, offers2.get().size());
+
+  // Ensure the slave id is different.
+  ASSERT_NE(
+      offers1.get()[0].slave_id().value(), offers2.get()[0].slave_id().value());
+
+  driver.stop();
+  driver.join();
+}