You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/06/08 00:06:48 UTC

[1/2] git commit: Fixed master to properly do task reconciliation when slave re-registers.

Updated Branches:
  refs/heads/master 9ae1d01d1 -> 3f07a9437


Fixed master to properly do task reconciliation when slave re-registers.

Review: https://reviews.apache.org/r/11229


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/3f07a943
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/3f07a943
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/3f07a943

Branch: refs/heads/master
Commit: 3f07a94375c79b3bc934b836102742b47fd6d52d
Parents: 15c0040
Author: Vinod Kone <vi...@twitter.com>
Authored: Fri May 17 17:09:36 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Fri Jun 7 15:04:42 2013 -0700

----------------------------------------------------------------------
 src/master/master.cpp               |   91 +++++++++++++++--------------
 src/master/master.hpp               |   17 ++++--
 src/slave/slave.cpp                 |    5 ++
 src/slave/slave.hpp                 |    2 -
 src/tests/fault_tolerance_tests.cpp |   93 +++++++++++++++++++++++++++++-
 5 files changed, 156 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/3f07a943/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index a2e4b90..60c6d4f 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -945,50 +945,8 @@ void Master::reregisterSlave(const SlaveID& slaveId,
                    << ") is being allowed to re-register with an already"
                    << " in use id (" << slaveId << ")";
 
-      // Consolidate tasks between master and the slave.
-      // Fist, look for the tasks present in the slave but not present
-      // in the master.
-      multihashmap<FrameworkID, TaskID> slaveTasks;
-      foreach (const Task& task, tasks) {
-        if (!slave->tasks.contains(
-            std::make_pair(task.framework_id(), task.task_id()))) {
-          // This might happen if a terminal status update for this task
-          // came before the slave re-registered message.
-          // TODO(vinod): Consider sending a KillTaskMessage.
-          // TODO(vinod): Export a statistic for these tasks.
-          LOG(WARNING) << "Slave " << slaveId << " attempted to re-register"
-                       << " with unknown task " << task.task_id()
-                       << " of framework " << task.framework_id();
-        }
-        slaveTasks.put(task.framework_id(), task.task_id());
-      }
-
-      // Send TASK_LOST updates for tasks present in the master but
-      // missing from the slave. This could happen if the task was
-      // dropped by the slave (e.g., slave exited before getting the
-      // task or the task was launched while slave was in recovery).
-      foreachvalue (Task* task, utils::copy(slave->tasks)) {
-        if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
-          LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id()
-                       << " of framework " << task->framework_id()
-                       << " unknown to the slave " << slaveId;
-
-          Framework* framework = getFramework(task->framework_id());
-          if (framework != NULL) {
-            const StatusUpdate& update = protobuf::createStatusUpdate(
-                task->framework_id(),
-                slaveId,
-                task->task_id(),
-                TASK_LOST,
-                "Task was not received by the slave");
-
-            StatusUpdateMessage message;
-            message.mutable_update()->CopyFrom(update);
-            send(framework->pid, message);
-          }
-          removeTask(task);
-        }
-      }
+      // Reconcile tasks between master and the slave.
+      reconcileTasks(slave, tasks);
 
       SlaveReregisteredMessage message;
       message.mutable_slave_id()->MergeFrom(slave->id);
@@ -1648,6 +1606,46 @@ Resources Master::launchTask(const TaskInfo& task,
 }
 
 
+void Master::reconcileTasks(Slave* slave, const vector<Task>& tasks)
+{
+  CHECK_NOTNULL(slave);
+
+  // We convert the 'tasks' into a map for easier lookup below.
+  // TODO(vinod): Check if the tasks are known to the master.
+  multihashmap<FrameworkID, TaskID> slaveTasks;
+  foreach (const Task& task, tasks) {
+    slaveTasks.put(task.framework_id(), task.task_id());
+  }
+
+  // Send TASK_LOST updates for tasks present in the master but
+  // missing from the slave. This could happen if the task was
+  // dropped by the slave (e.g., slave exited before getting the
+  // task or the task was launched while slave was in recovery).
+  foreachvalue (Task* task, utils::copy(slave->tasks)) {
+    if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
+      LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id()
+                   << " of framework " << task->framework_id()
+                   << " unknown to the slave " << slave->id;
+
+      Framework* framework = getFramework(task->framework_id());
+      if (framework != NULL) {
+        const StatusUpdate& update = protobuf::createStatusUpdate(
+            task->framework_id(),
+            slave->id,
+            task->task_id(),
+            TASK_LOST,
+            "Task was not received by the slave");
+
+        StatusUpdateMessage message;
+        message.mutable_update()->CopyFrom(update);
+        send(framework->pid, message);
+      }
+      removeTask(task);
+    }
+  }
+}
+
+
 void Master::addFramework(Framework* framework, bool reregister)
 {
   CHECK(frameworks.count(framework->id) == 0);
@@ -1920,6 +1918,11 @@ void Master::readdSlave(Slave* slave,
   }
 
   foreach (const Task& task, tasks) {
+    // Ignore tasks that have reached terminal state.
+    if (protobuf::isTerminalState(task.state())) {
+      continue;
+    }
+
     Task* t = new Task(task);
 
     // Add the task to the slave.

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/3f07a943/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 8e7b74c..86c5232 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -132,11 +132,18 @@ protected:
   // Process a launch tasks request (for a non-cancelled offer) by
   // launching the desired tasks (if the offer contains a valid set of
   // tasks) and reporting any unused resources to the allocator.
-  void processTasks(Offer* offer,
-                    Framework* framework,
-                    Slave* slave,
-                    const std::vector<TaskInfo>& tasks,
-                    const Filters& filters);
+  void processTasks(
+      Offer* offer,
+      Framework* framework,
+      Slave* slave,
+      const std::vector<TaskInfo>& tasks,
+      const Filters& filters);
+
+  // Reconciles a re-registering slave's tasks and sends TASK_LOST
+  // updates for tasks known to the master but unknown to the slave.
+  void reconcileTasks(
+      Slave* slave,
+      const std::vector<Task>& tasks);
 
   // Add a framework.
   void addFramework(Framework* framework, bool reregister = false);

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/3f07a943/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 840c64d..b5b7e0e 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -703,6 +703,11 @@ void Slave::doReliableRegistration()
 
           message.add_tasks()->CopyFrom(t);
         }
+
+        // Add terminated tasks.
+        foreachvalue (Task* task, executor->terminatedTasks) {
+          message.add_tasks()->CopyFrom(*task);
+        }
       }
     }
     send(master, message);

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/3f07a943/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 5aba7ed..d1ba82e 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -376,8 +376,6 @@ struct Executor
   hashmap<TaskID, Task*> terminatedTasks; // Terminated but pending updates.
   boost::circular_buffer<Task> completedTasks; // Terminated and updates acked.
 
-  multihashmap<TaskID, UUID> updates; // Pending updates.
-
 private:
   Executor(const Executor&);              // No copying.
   Executor& operator = (const Executor&); // No assigning.

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/3f07a943/src/tests/fault_tolerance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/fault_tolerance_tests.cpp b/src/tests/fault_tolerance_tests.cpp
index ef570b7..4afbbec 100644
--- a/src/tests/fault_tolerance_tests.cpp
+++ b/src/tests/fault_tolerance_tests.cpp
@@ -1378,7 +1378,7 @@ TEST_F(FaultToleranceTest, SlaveReregisterTerminatedExecutor)
 // This test verifies that the master sends TASK_LOST updates
 // for tasks in the master absent from the re-registered slave.
 // We do this by dropping RunTaskMessage from master to the slave.
-TEST_F(FaultToleranceTest, ConsolidateTasksOnSlaveReregistration)
+TEST_F(FaultToleranceTest, ReconcileLostTasks)
 {
   Try<PID<Master> > master = StartMaster();
   ASSERT_SOME(master);
@@ -1448,3 +1448,94 @@ TEST_F(FaultToleranceTest, ConsolidateTasksOnSlaveReregistration)
 
   Shutdown();
 }
+
+
+// This test verifies that when the slave re-registers, the master
+// does not send TASK_LOST update for a task that has reached terminal
+// state but is waiting for an acknowledgement.
+TEST_F(FaultToleranceTest, ReconcileIncompleteTasks)
+{
+  Try<PID<Master> > master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave> > slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(&sched, DEFAULT_FRAMEWORK_INFO, master.get());
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer> > offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  TaskInfo task;
+  task.set_name("test task");
+  task.mutable_task_id()->set_value("1");
+  task.mutable_slave_id()->MergeFrom(offers.get()[0].slave_id());
+  task.mutable_resources()->MergeFrom(offers.get()[0].resources());
+  task.mutable_executor()->MergeFrom(DEFAULT_EXECUTOR_INFO);
+
+  vector<TaskInfo> tasks;
+  tasks.push_back(task);
+
+  EXPECT_CALL(exec, registered(_, _, _, _));
+
+  // Send a terminal update right away.
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_FINISHED));
+
+  // Drop the status update from slave to the master, so that
+  // the slave has a pending terminal update when it re-registers.
+  DROP_PROTOBUF(StatusUpdateMessage(), _, master.get());
+
+  Future<Nothing> _statusUpdate = FUTURE_DISPATCH(_, &Slave::_statusUpdate);
+
+  Future<TaskStatus> status;
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&status));
+
+  Clock::pause();
+
+  driver.launchTasks(offers.get()[0].id(), tasks);
+
+  AWAIT_READY(_statusUpdate);
+
+  Future<SlaveReregisteredMessage> slaveReregisteredMessage =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // Simulate a spurious newMasterDetected event (e.g., due to ZooKeeper
+  // expiration) at the slave to force re-registration.
+  NewMasterDetectedMessage message;
+  message.set_pid(master.get());
+
+  process::post(slave.get(), message);
+
+  AWAIT_READY(slaveReregisteredMessage);
+
+  // The master should not send a TASK_LOST after the slave
+  // re-registers. We check this by advancing the clock so that
+  // the only update the scheduler receives is the retried
+  // TASK_FINISHED update.
+  Clock::advance(STATUS_UPDATE_RETRY_INTERVAL);
+
+  AWAIT_READY(status);
+  ASSERT_EQ(TASK_FINISHED, status.get().state());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}


[2/2] git commit: Fixed slave to properly handle terminated tasks that have pending updates.

Posted by vi...@apache.org.
Fixed slave to properly handle terminated tasks that have pending
updates.

Review: https://reviews.apache.org/r/11694


Project: http://git-wip-us.apache.org/repos/asf/incubator-mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mesos/commit/15c00409
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mesos/tree/15c00409
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mesos/diff/15c00409

Branch: refs/heads/master
Commit: 15c00409e498ad119c9c92571980c048dbc7a2ef
Parents: 9ae1d01
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Jun 5 22:47:57 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Fri Jun 7 15:04:42 2013 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp |  116 ++++++++++++++++++++++++++--------------------
 src/slave/slave.hpp |   14 ++++--
 2 files changed, 75 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/15c00409/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 8ce1646..840c64d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1041,21 +1041,14 @@ void Slave::killTask(const FrameworkID& frameworkId, const TaskID& taskId)
 
   switch (executor->state) {
     case Executor::REGISTERING: {
-      if (executor->queuedTasks.contains(taskId)) {
-        // We remove the task here so that if this executor registers at
-        // a later point in time it won't be sent this task.
-        LOG(WARNING) << "Removing queued task " << taskId
-                     << " from executor '" << executor->id
-                     << "' of framework " << frameworkId
-                     << " because the executor hasn't registered yet";
-        executor->removeTask(taskId);
-      } else {
-        LOG(WARNING) << "Cannot kill task " << taskId
-                     << " of framework " << frameworkId
-                     << " because the executor '" << executor->id
-                     << "' hasn't registered yet";
-      }
+      LOG(WARNING) << "Removing queued task " << taskId
+                   << " of framework " << frameworkId
+                   << " because the executor '" << executor->id
+                   << "' hasn't registered yet";
 
+      // NOTE: Sending a TASK_KILLED update removes the task from
+      // Executor::queuedTasks, so that if the executor registers at
+      // a later point in time, it won't get this task.
       const StatusUpdate& update = protobuf::createStatusUpdate(
           frameworkId,
           info.id(),
@@ -1376,11 +1369,13 @@ void Slave::_statusUpdateAcknowledgement(
         executor->state == Executor::TERMINATED)
     << executor->state;
 
-  executor->updates.remove(taskId, uuid);
+  if (executor->terminatedTasks.contains(taskId)) {
+    executor->completeTask(taskId);
+  }
 
   // Remove the executor if it has terminated and there are no more
-  // pending updates.
-  if (executor->state == Executor::TERMINATED && executor->updates.empty()) {
+  // incomplete tasks.
+  if (executor->state == Executor::TERMINATED && !executor->incompleteTasks()) {
     remove(framework, executor);
   }
 }
@@ -1594,15 +1589,13 @@ void Slave::reregisterExecutor(
       send(executor->pid, message);
 
       // Handle all the pending updates.
+      // The status update manager might have already checkpointed some
+      // of these pending updates (for example, if the slave died right
+      // after it checkpointed the update but before it could send the
+      // ACK to the executor). This is ok because the status update
+      // manager correctly handles duplicate updates.
       foreach (const StatusUpdate& update, updates) {
-        // The status update manager might have already checkpointed some
-        // of these pending updates (for e.g: if the slave died right
-        // after it checkpointed the update but before it could send the
-        // ACK to the executor). If so, we can just ignore those updates.
-        if (!executor->updates.contains(
-            update.status().task_id(), UUID::fromBytes(update.uuid()))) {
-          statusUpdate(update); // This also updates the executor's resources!
-        }
+        statusUpdate(update); // This also updates the executor's resources!
       }
 
       // Now, if there is any task still in STAGING state and not in
@@ -1720,8 +1713,13 @@ void Slave::statusUpdate(const StatusUpdate& update)
   if (executor == NULL) {
     LOG(WARNING)  << "Could not find the executor for "
                   << "status update " << update;
-    stats.invalidStatusUpdates++;
+    stats.validStatusUpdates++;
 
+    // NOTE: We forward the update here because this update could be
+    // generated by the slave when the executor is unknown to it,
+    // e.g., killTask(), _runTask().
+    // TODO(vinod): Revisit these semantics when we disallow updates
+    // sent by executors that are unknown to the slave.
     statusUpdateManager->update(update, info.id())
       .onAny(defer(self(), &Slave::_statusUpdate, params::_1, update, None()));
 
@@ -1740,11 +1738,10 @@ void Slave::statusUpdate(const StatusUpdate& update)
   stats.validStatusUpdates++;
 
   executor->updateTaskState(status.task_id(), status.state());
-  executor->updates.put(status.task_id(), UUID::fromBytes(update.uuid()));
 
   // Handle the task appropriately if it's terminated.
   if (protobuf::isTerminalState(status.state())) {
-    executor->removeTask(status.task_id());
+    executor->terminateTask(status.task_id(), status.state());
 
     // Tell the isolator to update the resources.
     dispatch(isolator,
@@ -2152,10 +2149,10 @@ void Slave::executorTerminated(
         send(master, message);
       }
 
-      // Remove the executor if either there are no pending updates
-      // or the framework is terminating.
-      if (executor->updates.empty() ||
-          framework->state == Framework::TERMINATING) {
+      // Remove the executor if either the framework is terminating or
+      // there are no incomplete tasks.
+      if (framework->state == Framework::TERMINATING ||
+          !executor->incompleteTasks()) {
         remove(framework, executor);
       }
       break;
@@ -2186,8 +2183,8 @@ void Slave::remove(Framework* framework, Executor* executor)
   // care for pending updates when a framework is terminating
   // because the framework cannot ACK them.
   CHECK(executor->state == Executor::TERMINATED) << executor->state;
-  CHECK (executor->updates.empty() ||
-         framework->state == Framework::TERMINATING);
+  CHECK(framework->state == Framework::TERMINATING ||
+        !executor->incompleteTasks());
 
   // TODO(vinod): Move the responsibility of gc'ing to the
   // Executor struct.
@@ -2789,7 +2786,7 @@ Executor* Framework::getExecutor(const TaskID& taskId)
   foreachvalue (Executor* executor, executors) {
     if (executor->queuedTasks.contains(taskId) ||
         executor->launchedTasks.contains(taskId) ||
-        executor->updates.contains(taskId)) {
+        executor->terminatedTasks.contains(taskId)) {
       return executor;
     }
   }
@@ -2941,23 +2938,36 @@ Task* Executor::addTask(const TaskInfo& task)
 }
 
 
-void Executor::removeTask(const TaskID& taskId)
+void Executor::terminateTask(
+    const TaskID& taskId,
+    const mesos::TaskState& state)
 {
+  Task* task = NULL;
   // Remove the task if it's queued.
-  queuedTasks.erase(taskId);
-
-  // Update the resources if it's been launched.
-  if (launchedTasks.contains(taskId)) {
-    Task* task = launchedTasks[taskId];
+  if (queuedTasks.contains(taskId)) {
+    task = new Task(
+        protobuf::createTask(queuedTasks[taskId], state, id, frameworkId));
+  } else if (launchedTasks.contains(taskId)) {
+    // Update the resources if it's been launched.
+    task = launchedTasks[taskId];
     foreach (const Resource& resource, task->resources()) {
       resources -= resource;
     }
     launchedTasks.erase(taskId);
+  }
 
-    completedTasks.push_back(*task);
+  terminatedTasks[taskId] = CHECK_NOTNULL(task);
+}
 
-    delete task;
-  }
+
+void Executor::completeTask(const TaskID& taskId)
+{
+  CHECK(terminatedTasks.contains(taskId)) << "Unknown task " << taskId;
+
+  Task* task = terminatedTasks[taskId];
+  completedTasks.push_back(*task);
+  terminatedTasks.erase(taskId);
+  delete task;
 }
 
 
@@ -3001,16 +3011,14 @@ void Executor::recoverTask(const TaskState& state)
   // Read updates to get the latest state of the task.
   foreach (const StatusUpdate& update, state.updates) {
     updateTaskState(state.id, update.status().state());
-    updates.put(state.id, UUID::fromBytes(update.uuid()));
 
-    // Remove the task if it received a terminal update.
+    // Terminate the task if it received a terminal update.
     if (protobuf::isTerminalState(update.status().state())) {
-      removeTask(state.id);
+      terminateTask(state.id, update.status().state());
 
-      // If the terminal update has been acknowledged, remove it
-      // from pending tasks.
+      // If the terminal update has been acknowledged, remove it.
       if (state.acks.contains(UUID::fromBytes(update.uuid()))) {
-        updates.remove(state.id, UUID::fromBytes(update.uuid()));
+        completeTask(state.id);
       }
       break;
     }
@@ -3026,6 +3034,14 @@ void Executor::updateTaskState(const TaskID& taskId, mesos::TaskState state)
 }
 
 
+bool Executor::incompleteTasks()
+{
+  return !queuedTasks.empty() ||
+         !launchedTasks.empty() ||
+         !terminatedTasks.empty();
+}
+
+
 std::ostream& operator << (std::ostream& stream, Framework::State state) {
   switch (state) {
     case Framework::RUNNING:     return stream << "RUNNING";

http://git-wip-us.apache.org/repos/asf/incubator-mesos/blob/15c00409/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 26dc96e..5aba7ed 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -335,11 +335,15 @@ struct Executor
   ~Executor();
 
   Task* addTask(const TaskInfo& task);
-  void removeTask(const TaskID& taskId);
+  void terminateTask(const TaskID& taskId, const mesos::TaskState& state);
+  void completeTask(const TaskID& taskId);
   void checkpointTask(const TaskInfo& task);
   void recoverTask(const state::TaskState& state);
   void updateTaskState(const TaskID& taskId, TaskState state);
 
+  // Returns true if there are any queued/launched/terminated tasks.
+  bool incompleteTasks();
+
   enum State {
     REGISTERING,  // Executor is launched but not (re-)registered yet.
     RUNNING,      // Executor has (re-)registered.
@@ -367,13 +371,13 @@ struct Executor
 
   Resources resources; // Currently consumed resources.
 
-  hashmap<TaskID, TaskInfo> queuedTasks;
-  hashmap<TaskID, Task*> launchedTasks;
+  hashmap<TaskID, TaskInfo> queuedTasks; // Not yet launched.
+  hashmap<TaskID, Task*> launchedTasks;  // Running.
+  hashmap<TaskID, Task*> terminatedTasks; // Terminated but pending updates.
+  boost::circular_buffer<Task> completedTasks; // Terminated and updates acked.
 
   multihashmap<TaskID, UUID> updates; // Pending updates.
 
-  boost::circular_buffer<Task> completedTasks;
-
 private:
   Executor(const Executor&);              // No copying.
   Executor& operator = (const Executor&); // No assigning.