You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/04/17 09:09:12 UTC

svn commit: r1468777 - in /incubator/mesos/trunk/src: slave/slave.cpp slave/slave.hpp tests/gc_tests.cpp

Author: vinodkone
Date: Wed Apr 17 07:09:12 2013
New Revision: 1468777

URL: http://svn.apache.org/r1468777
Log:
Fixed slave to properly schedule/unschedule top level executor
directories for garbage collection.

Review: https://reviews.apache.org/r/10413

Modified:
    incubator/mesos/trunk/src/slave/slave.cpp
    incubator/mesos/trunk/src/slave/slave.hpp
    incubator/mesos/trunk/src/tests/gc_tests.cpp

Modified: incubator/mesos/trunk/src/slave/slave.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.cpp?rev=1468777&r1=1468776&r2=1468777&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.cpp (original)
+++ incubator/mesos/trunk/src/slave/slave.cpp Wed Apr 17 07:09:12 2013
@@ -384,7 +384,8 @@ void Slave::initialize()
 void Slave::_initialize(const Future<Nothing>& future)
 {
   if (!future.isReady()) {
-    LOG(FATAL) << "Recovery failure: " << future.failure();
+    LOG(FATAL) << "Recovery failure: "
+               << (future.isFailed() ? future.failure() : "future discarded");
   }
 
   LOG(INFO) << "Finished recovery";
@@ -516,10 +517,11 @@ void Slave::fileAttached(const Future<No
 }
 
 
-void Slave::detachFile(const Future<Nothing>& result, const string& path)
+// TODO(vinod/bmahler): Get rid of this helper.
+Nothing Slave::detachFile(const string& path)
 {
-  CHECK(!result.isDiscarded());
   files->detach(path);
+  return Nothing();
 }
 
 
@@ -698,6 +700,14 @@ void Slave::doReliableRegistration()
 }
 
 
+// Helper to unschedule the path.
+// TODO(vinod): Can we avoid this helper?
+Future<bool> Slave::unschedule(const string& path)
+{
+  return gc.unschedule(path);
+}
+
+
 // TODO(vinod): Instead of crashing the slave on checkpoint errors,
 // send TASK_LOST to the framework.
 void Slave::runTask(
@@ -709,10 +719,102 @@ void Slave::runTask(
   LOG(INFO) << "Got assigned task " << task.task_id()
             << " for framework " << frameworkId;
 
+  Future<bool> unschedule = true;
+
+  Executor* executor = NULL;
+  ExecutorID executorId = getExecutorInfo(task).executor_id();
+
+  Framework* framework = getFramework(frameworkId);
+  // If we are about to create a new framework, unschedule the work
+  // and meta directories from getting gc'ed.
+  if (framework == NULL) {
+    // Framework work directory.
+    string path = paths::getFrameworkPath(
+        flags.work_dir, info.id(), frameworkId);
+
+    if (os::exists(path)) {
+      unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
+    }
+
+    // Framework meta directory.
+    path = paths::getFrameworkPath(metaDir, info.id(), frameworkId);
+    if (os::exists(path)) {
+      unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
+    }
+  } else {
+    // We add the corresponding executor to 'pending' to ensure the
+    // framework and top level executor directories are not scheduled
+    // for deletion before '_runTask()' is called.
+    framework->pending.insert(executorId);
+
+    executor = framework->getExecutor(executorId);
+  }
+
+  // If we are about to create a new executor, unschedule the top
+  // level work and meta directories from getting gc'ed.
+  if (executor == NULL) {
+    // Executor work directory.
+    string path = paths::getExecutorPath(
+        flags.work_dir, info.id(), frameworkId, executorId);
+
+    if (os::exists(path)) {
+      unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
+    }
+
+    // Executor meta directory.
+    path = paths::getExecutorPath(
+        metaDir, info.id(), frameworkId, executorId);
+
+    if (os::exists(path)) {
+      unschedule = unschedule.then(defer(self(), &Self::unschedule, path));
+    }
+  }
+
+  // Run the task after the unschedules are done.
+  unschedule.onAny(
+      defer(self(),
+            &Self::_runTask,
+            params::_1,
+            frameworkInfo,
+            frameworkId,
+            pid,
+            task));
+}
+
+
+void Slave::_runTask(
+    const Future<bool>& future,
+    const FrameworkInfo& frameworkInfo,
+    const FrameworkID& frameworkId,
+    const string& pid,
+    const TaskInfo& task)
+{
+  LOG(INFO) << "Launching task " << task.task_id()
+            << " for framework " << frameworkId;
+
+  if (!future.isReady()) {
+    LOG(ERROR) << "Failed to unschedule directories scheduled for gc: "
+               << (future.isFailed() ? future.failure() : "future discarded");
+
+    const StatusUpdate& update = protobuf::createStatusUpdate(
+        frameworkId,
+        info.id(),
+        task.task_id(),
+        TASK_LOST,
+        "Could not launch the task because we failed to unschedule directories"
+        " scheduled for gc");
+
+    statusUpdate(update);
+    return;
+  }
+
   CHECK(state == RECOVERING || state == DISCONNECTED ||
         state == RUNNING || state == TERMINATING)
     << state;
 
+  // This could happen if the runTask message was intended for the
+  // old slave, but the new (restarted) slave received it because
+  // it has the same pid.
   if (state != RUNNING) {
     LOG(WARNING) << "Cannot run task " << task.task_id()
                  << " of framework " << frameworkId
@@ -768,7 +870,7 @@ void Slave::runTask(
     return;
   }
 
-  const ExecutorInfo& executorInfo = framework->getExecutorInfo(task);
+  const ExecutorInfo& executorInfo = getExecutorInfo(task);
   const ExecutorID& executorId = executorInfo.executor_id();
 
   // Either send the task to an executor or start a new executor
@@ -1783,6 +1885,59 @@ Framework* Slave::getFramework(const Fra
 }
 
 
+ExecutorInfo Slave::getExecutorInfo(const TaskInfo& task)
+{
+  CHECK(task.has_executor() != task.has_command());
+
+  if (task.has_command()) {
+    ExecutorInfo executor;
+
+    // Command executors share the same id as the task.
+    executor.mutable_executor_id()->set_value(task.task_id().value());
+
+    // Prepare an executor name which includes information on the
+    // command being launched.
+    string name =
+      "(Task: " + task.task_id().value() + ") " + "(Command: sh -c '";
+
+    if (task.command().value().length() > 15) {
+      name += task.command().value().substr(0, 12) + "...')";
+    } else {
+      name += task.command().value() + "')";
+    }
+
+    executor.set_name("Command Executor " + name);
+    executor.set_source(task.task_id().value());
+
+    // Copy the CommandInfo to get the URIs and environment, but
+    // update it to invoke 'mesos-executor' (unless we couldn't
+    // resolve 'mesos-executor' via 'realpath', in which case just
+    // echo the error and exit).
+    executor.mutable_command()->MergeFrom(task.command());
+
+    Try<string> path = os::realpath(
+        path::join(flags.launcher_dir, "mesos-executor"));
+
+    if (path.isSome()) {
+      executor.mutable_command()->set_value(path.get());
+    } else {
+      executor.mutable_command()->set_value(
+          "echo '" + path.error() + "'; exit 1");
+    }
+
+    // TODO(benh): Set some resources for the executor so that a task
+    // doesn't end up getting killed because the amount of resources
+    // of the executor went over those allocated. Note that this might
+    // mean that the number of resources on the machine will actually
+    // be slightly oversubscribed, so we'll need to reevaluate with
+    // respect to resources that can't be oversubscribed.
+    return executor;
+  }
+
+  return task.executor();
+}
+
+
 void _watch(
     const Future<Nothing>& watch,
     const FrameworkID& frameworkId,
@@ -2010,8 +2165,8 @@ void Slave::remove(Framework* framework,
   CHECK_NOTNULL(framework);
   CHECK_NOTNULL(executor);
 
-  LOG(INFO) << "Cleaning up executor '" << executor->id << "'"
-            << " of framework " << framework->id;
+  LOG(INFO) << "Cleaning up executor '" << executor->id
+            << "' of framework " << framework->id;
 
   CHECK(framework->state == Framework::RUNNING ||
         framework->state == Framework::TERMINATING);
@@ -2024,23 +2179,40 @@ void Slave::remove(Framework* framework,
         (executor->updates.empty() ||
          framework->state == Framework::TERMINATING));
 
+  // TODO(vinod): Move the responsibility of gc'ing to the
+  // Executor struct.
+
   // Schedule the executor run work directory to get garbage collected.
-  // TODO(vinod): Also schedule the top level executor work directory.
-  gc.schedule(flags.gc_delay, executor->directory).onAny(
-      defer(self(), &Self::detachFile, params::_1, executor->directory));
+  const string& path = paths::getExecutorRunPath(
+      flags.work_dir, info.id(), framework->id, executor->id, executor->uuid);
+
+  gc.schedule(flags.gc_delay, path)
+    .then(defer(self(), &Self::detachFile, path));
+
+  // Schedule the top level executor work directory, only if the
+  // framework doesn't have any 'pending' tasks for this executor.
+  if (!framework->pending.contains(executor->id)) {
+    const string& path = paths::getExecutorPath(
+        flags.work_dir, info.id(), framework->id, executor->id);
+
+    gc.schedule(flags.gc_delay, path);
+  }
 
   if (executor->checkpoint) {
     // Schedule the executor run meta directory to get garbage collected.
-    // TODO(vinod): Also schedule the top level executor meta directory.
-    const string& executorMetaDir = paths::getExecutorRunPath(
-        metaDir,
-        info.id(),
-        framework->id,
-        executor->id,
-        executor->uuid);
+    const string& path = paths::getExecutorRunPath(
+        metaDir, info.id(), framework->id, executor->id, executor->uuid);
 
-    gc.schedule(flags.gc_delay, executorMetaDir).onAny(
-        defer(self(), &Self::detachFile, params::_1, executorMetaDir));
+    gc.schedule(flags.gc_delay, path);
+
+    // Schedule the top level executor meta directory, only if the
+    // framework doesn't have any 'pending' tasks for this executor.
+    if (!framework->pending.contains(executor->id)) {
+      const string& path = paths::getExecutorPath(
+          metaDir, info.id(), framework->id, executor->id);
+
+      gc.schedule(flags.gc_delay, path);
+    }
   }
 
   framework->destroyExecutor(executor->id);
@@ -2056,7 +2228,7 @@ void Slave::remove(Framework* framework)
 {
   CHECK_NOTNULL(framework);
 
-  LOG(INFO) << "Cleaning up framework " << framework->id;
+  LOG(INFO)<< "Cleaning up framework " << framework->id;
 
   CHECK(framework->state == Framework::RUNNING ||
         framework->state == Framework::TERMINATING);
@@ -2066,24 +2238,24 @@ void Slave::remove(Framework* framework)
   // Close all status update streams for this framework.
   statusUpdateManager->cleanup(framework->id);
 
-  // Schedule the framework work directory to get garbage collected.
-  const string& frameworkDir = paths::getFrameworkPath(
-      flags.work_dir,
-      info.id(),
-      framework->id);
-
-  gc.schedule(flags.gc_delay, frameworkDir).onAny(
-      defer(self(), &Self::detachFile, params::_1, frameworkDir));
-
-  if (framework->info.checkpoint()) {
-    // Schedule the framework meta directory to get garbage collected.
-    const string& frameworkMetaDir = paths::getFrameworkPath(
-        metaDir,
-        info.id(),
-        framework->id);
 
-    gc.schedule(flags.gc_delay, frameworkMetaDir).onAny(
-        defer(self(), &Self::detachFile, params::_1, frameworkMetaDir));
+  // Schedule the framework work and meta directories for garbage
+  // collection, only if it has no pending tasks.
+  // TODO(vinod): Move the responsibility of gc'ing to the
+  // Framework struct.
+  if (framework->pending.empty()) {
+    const string& path = paths::getFrameworkPath(
+        flags.work_dir, info.id(), framework->id);
+
+    gc.schedule(flags.gc_delay, path);
+
+    if (framework->info.checkpoint()) {
+      // Schedule the framework meta directory to get garbage collected.
+      const string& path = paths::getFrameworkPath(
+          metaDir, info.id(), framework->id);
+
+      gc.schedule(flags.gc_delay, path);
+    }
   }
 
   frameworks.erase(framework->id);
@@ -2099,7 +2271,7 @@ void Slave::remove(Framework* framework)
     // and shutdownExecutor() could return Futures and a slave could
     // shutdown when all the Futures are satisfied (e.g., collect()).
     if (state == TERMINATING ||
-        (flags.recover == "cleanup" && !recovered.future().isPending()) ) {
+        (flags.recover == "cleanup" && !recovered.future().isPending())) {
       terminate(self());
     }
   }
@@ -2522,59 +2694,6 @@ Framework::~Framework()
 }
 
 
-ExecutorInfo Framework::getExecutorInfo(const TaskInfo& task)
-{
-  CHECK(task.has_executor() != task.has_command());
-
-  if (task.has_command()) {
-    ExecutorInfo executor;
-
-    // Command executors share the same id as the task.
-    executor.mutable_executor_id()->set_value(task.task_id().value());
-
-    // Prepare an executor name which includes information on the
-    // command being launched.
-    string name =
-      "(Task: " + task.task_id().value() + ") " + "(Command: sh -c '";
-
-    if (task.command().value().length() > 15) {
-      name += task.command().value().substr(0, 12) + "...')";
-    } else {
-      name += task.command().value() + "')";
-    }
-
-    executor.set_name("Command Executor " + name);
-    executor.set_source(task.task_id().value());
-
-    // Copy the CommandInfo to get the URIs and environment, but
-    // update it to invoke 'mesos-executor' (unless we couldn't
-    // resolve 'mesos-executor' via 'realpath', in which case just
-    // echo the error and exit).
-    executor.mutable_command()->MergeFrom(task.command());
-
-    Try<string> path = os::realpath(
-        path::join(slave->flags.launcher_dir, "mesos-executor"));
-
-    if (path.isSome()) {
-      executor.mutable_command()->set_value(path.get());
-    } else {
-      executor.mutable_command()->set_value(
-          "echo '" + path.error() + "'; exit 1");
-    }
-
-    // TODO(benh): Set some resources for the executor so that a task
-    // doesn't end up getting killed because the amount of resources
-    // of the executor went over those allocated. Note that this might
-    // mean that the number of resources on the machine will actually
-    // be slightly oversubscribed, so we'll need to reevaluate with
-    // respect to resources that can't be oversubscribed.
-    return executor;
-  }
-
-  return task.executor();
-}
-
-
 Executor* Framework::createExecutor(const ExecutorInfo& executorInfo)
 {
   // We create a UUID for the new executor. The UUID uniquely
@@ -2648,11 +2767,11 @@ Executor* Framework::recoverExecutor(con
                  << "' of framework " << id
                  << " because its latest run cannot be recovered";
 
-    // GC the executor work directory.
+    // GC the top level executor work directory.
     slave->gc.schedule(slave->flags.gc_delay, paths::getExecutorPath(
         slave->flags.work_dir, slave->info.id(), id, state.id));
 
-    // GC the executor meta directory.
+    // GC the top level executor meta directory.
     slave->gc.schedule(slave->flags.gc_delay, paths::getExecutorPath(
         slave->metaDir, slave->info.id(), id, state.id));
 
@@ -2665,13 +2784,17 @@ Executor* Framework::recoverExecutor(con
   foreachvalue (const RunState& run, state.runs) {
     CHECK_SOME(run.id);
     if (uuid != run.id.get()) {
-      // GC the run's work directory.
+      // GC the executor run's work directory.
       slave->gc.schedule(slave->flags.gc_delay, paths::getExecutorRunPath(
           slave->flags.work_dir, slave->info.id(), id, state.id, run.id.get()));
 
-      // GC the run's meta directory.
+      // GC the executor run's meta directory.
       slave->gc.schedule(slave->flags.gc_delay, paths::getExecutorRunPath(
           slave->metaDir, slave->info.id(), id, state.id, run.id.get()));
+
+      // NOTE: We don't schedule the top level executor work and meta
+      // directories for GC here, because they will be scheduled when
+      // the latest executor run terminates.
     }
   }
 

Modified: incubator/mesos/trunk/src/slave/slave.hpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/slave/slave.hpp?rev=1468777&r1=1468776&r2=1468777&view=diff
==============================================================================
--- incubator/mesos/trunk/src/slave/slave.hpp (original)
+++ incubator/mesos/trunk/src/slave/slave.hpp Wed Apr 17 07:09:12 2013
@@ -32,6 +32,7 @@
 #include <process/protobuf.hpp>
 
 #include <stout/hashmap.hpp>
+#include <stout/hashset.hpp>
 #include <stout/multihashmap.hpp>
 #include <stout/os.hpp>
 #include <stout/owned.hpp>
@@ -99,6 +100,15 @@ public:
       const std::string& pid,
       const TaskInfo& task);
 
+  void _runTask(
+      const Future<bool>& future,
+      const FrameworkInfo& frameworkInfo,
+      const FrameworkID& frameworkId,
+      const std::string& pid,
+      const TaskInfo& task);
+
+  Future<bool> unschedule(const std::string& path);
+
   void killTask(const FrameworkID& frameworkId, const TaskID& taskId);
 
   void shutdownFramework(const FrameworkID& frameworkId);
@@ -197,11 +207,15 @@ protected:
 
   void fileAttached(const Future<Nothing>& result, const std::string& path);
 
-  void detachFile(const Future<Nothing>& result, const std::string& path);
+  Nothing detachFile(const std::string& path);
 
   // Helper routine to lookup a framework.
   Framework* getFramework(const FrameworkID& frameworkId);
 
+  // Returns an ExecutorInfo for a TaskInfo (possibly
+  // constructing one if the task has a CommandInfo).
+  ExecutorInfo getExecutorInfo(const TaskInfo& task);
+
   // Handle the second phase of shutting down an executor for those
   // executors that have not properly shutdown within a timeout.
   void shutdownExecutorTimeout(
@@ -376,9 +390,6 @@ struct Framework
 
   ~Framework();
 
-  // Returns an ExecutorInfo for a TaskInfo (possibly
-  // constructing one if the task has a CommandInfo).
-  ExecutorInfo getExecutorInfo(const TaskInfo& task);
   Executor* createExecutor(const ExecutorInfo& executorInfo);
   void destroyExecutor(const ExecutorID& executorId);
   Executor* getExecutor(const ExecutorID& executorId);
@@ -400,7 +411,7 @@ struct Framework
 
   UPID pid;
 
-  std::vector<TaskInfo> pending; // Pending tasks (used while INITIALIZING).
+  hashset<ExecutorID> pending; // Executors with pending tasks.
 
   // Current running executors.
   hashmap<ExecutorID, Executor*> executors;

Modified: incubator/mesos/trunk/src/tests/gc_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/mesos/trunk/src/tests/gc_tests.cpp?rev=1468777&r1=1468776&r2=1468777&view=diff
==============================================================================
--- incubator/mesos/trunk/src/tests/gc_tests.cpp (original)
+++ incubator/mesos/trunk/src/tests/gc_tests.cpp Wed Apr 17 07:09:12 2013
@@ -51,6 +51,7 @@
 #include "slave/constants.hpp"
 #include "slave/flags.hpp"
 #include "slave/gc.hpp"
+#include "slave/isolator.hpp"
 #include "slave/paths.hpp"
 #include "slave/slave.hpp"
 
@@ -64,6 +65,7 @@ using mesos::internal::master::Master;
 
 using mesos::internal::slave::GarbageCollector;
 using mesos::internal::slave::GarbageCollectorProcess;
+using mesos::internal::slave::Isolator;
 using mesos::internal::slave::Slave;
 
 using process::Clock;
@@ -637,3 +639,128 @@ TEST_F(GarbageCollectorIntegrationTest, 
 
   cluster.shutdown(); // Must shutdown before 'isolator' gets deallocated.
 }
+
+
+// This test verifies that the launch of new executor will result in
+// an unschedule of the framework and executor work directories
+// created by an old executor (with the same id).
+TEST_F(GarbageCollectorIntegrationTest, Unschedule)
+{
+  ASSERT_TRUE(GTEST_IS_THREADSAFE);
+
+  Try<PID<Master> > master = cluster.masters.start();
+  ASSERT_SOME(master);
+
+  Future<SlaveRegisteredMessage> slaveRegistered =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  MockExecutor exec;
+
+  TestingIsolator isolator(DEFAULT_EXECUTOR_ID, &exec);
+
+  Try<PID<Slave> > slave = cluster.slaves.start(&isolator);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegistered);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(_, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Resources resources = Resources::parse(cluster.slaves.flags.resources.get());
+  double cpus = resources.get("cpus", Value::Scalar()).value();
+  double mem = resources.get("mem", Value::Scalar()).value();
+
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(LaunchTasks(1, cpus, mem));
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(status);
+
+  EXPECT_EQ(TASK_RUNNING, status.get().state());
+
+  // TODO(benh/vinod): Would've been great to match the dispatch
+  // against arguments here.
+  // NOTE: Since Google Mock selects the last matching expectation
+  // that is still active, the order of (un)schedule expectations
+  // below are the reverse of the actual (un)schedule call order.
+
+  // Schedule framework work directory.
+  Future<Nothing> scheduleFrameworkWork =
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
+
+  // Schedule top level executor work directory.
+  Future<Nothing> scheduleExecutorWork =
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
+
+  // Schedule executor run work directory.
+  Future<Nothing> scheduleExecutorRunWork =
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::schedule);
+
+  // Unschedule top level executor work directory.
+  Future<Nothing> unscheduleExecutorWork =
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::unschedule);
+
+  // Unschedule framework work directory.
+  Future<Nothing> unscheduleFrameworkWork =
+    FUTURE_DISPATCH(_, &GarbageCollectorProcess::unschedule);
+
+  // Launch the next run of the executor on the receipt of next offer.
+  EXPECT_CALL(sched, resourceOffers(_, _))
+    .WillOnce(LaunchTasks(1, cpus, mem));
+
+  EXPECT_CALL(sched, statusUpdate(_, _))
+    .WillRepeatedly(Return());            // Ignore subsequent updates.
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  EXPECT_CALL(exec, shutdown(_))
+    .WillRepeatedly(Return());
+
+  Clock::pause();
+
+  // Kill the first executor.
+  process::dispatch(
+      isolator,
+      &Isolator::killExecutor,
+      frameworkId.get(),
+      DEFAULT_EXECUTOR_ID);
+
+  AWAIT_READY(scheduleExecutorRunWork);
+  AWAIT_READY(scheduleExecutorWork);
+  AWAIT_READY(scheduleFrameworkWork);
+
+  // Speedup the allocator.
+  while (unscheduleFrameworkWork.isPending()) {
+    Clock::advance(Seconds(1));
+    Clock::settle();
+  }
+
+  AWAIT_READY(unscheduleFrameworkWork);
+  AWAIT_READY(unscheduleExecutorWork);
+
+  Clock::resume();
+
+  driver.stop();
+  driver.join();
+
+  cluster.shutdown(); // Must shutdown before 'isolator' gets deallocated.
+}