You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2013/08/13 21:49:46 UTC

[02/23] git commit: Fixed slave to properly handle terminated tasks that have pending updates.

Fixed slave to properly handle terminated tasks that have pending
updates.

Review: https://reviews.apache.org/r/11694


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/22e64506
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/22e64506
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/22e64506

Branch: refs/heads/vinod/0.13.0
Commit: 22e64506da12dc29ecd101bf73702c8287811bf2
Parents: 3f809e0
Author: Vinod Kone <vi...@twitter.com>
Authored: Wed Jun 5 22:47:57 2013 -0700
Committer: Vinod Kone <vi...@twitter.com>
Committed: Fri Jun 7 15:19:28 2013 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 116 +++++++++++++++++++++++++++--------------------
 src/slave/slave.hpp |  14 ++++--
 2 files changed, 75 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/22e64506/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 8ce1646..840c64d 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1041,21 +1041,14 @@ void Slave::killTask(const FrameworkID& frameworkId, const TaskID& taskId)
 
   switch (executor->state) {
     case Executor::REGISTERING: {
-      if (executor->queuedTasks.contains(taskId)) {
-        // We remove the task here so that if this executor registers at
-        // a later point in time it won't be sent this task.
-        LOG(WARNING) << "Removing queued task " << taskId
-                     << " from executor '" << executor->id
-                     << "' of framework " << frameworkId
-                     << " because the executor hasn't registered yet";
-        executor->removeTask(taskId);
-      } else {
-        LOG(WARNING) << "Cannot kill task " << taskId
-                     << " of framework " << frameworkId
-                     << " because the executor '" << executor->id
-                     << "' hasn't registered yet";
-      }
+      LOG(WARNING) << "Removing queued task " << taskId
+                   << " of framework " << frameworkId
+                   << " because the executor '" << executor->id
+                   << "' hasn't registered yet";
 
+      // NOTE: Sending a TASK_KILLED update removes the task from
+      // Executor::queuedTasks, so that if the executor registers at
+      // a later point in time, it won't get this task.
       const StatusUpdate& update = protobuf::createStatusUpdate(
           frameworkId,
           info.id(),
@@ -1376,11 +1369,13 @@ void Slave::_statusUpdateAcknowledgement(
         executor->state == Executor::TERMINATED)
     << executor->state;
 
-  executor->updates.remove(taskId, uuid);
+  if (executor->terminatedTasks.contains(taskId)) {
+    executor->completeTask(taskId);
+  }
 
   // Remove the executor if it has terminated and there are no more
-  // pending updates.
-  if (executor->state == Executor::TERMINATED && executor->updates.empty()) {
+  // incomplete tasks.
+  if (executor->state == Executor::TERMINATED && !executor->incompleteTasks()) {
     remove(framework, executor);
   }
 }
@@ -1594,15 +1589,13 @@ void Slave::reregisterExecutor(
       send(executor->pid, message);
 
       // Handle all the pending updates.
+      // The status update manager might have already checkpointed some
+      // of these pending updates (for example, if the slave died right
+      // after it checkpointed the update but before it could send the
+      // ACK to the executor). This is ok because the status update
+      // manager correctly handles duplicate updates.
       foreach (const StatusUpdate& update, updates) {
-        // The status update manager might have already checkpointed some
-        // of these pending updates (for e.g: if the slave died right
-        // after it checkpointed the update but before it could send the
-        // ACK to the executor). If so, we can just ignore those updates.
-        if (!executor->updates.contains(
-            update.status().task_id(), UUID::fromBytes(update.uuid()))) {
-          statusUpdate(update); // This also updates the executor's resources!
-        }
+        statusUpdate(update); // This also updates the executor's resources!
       }
 
       // Now, if there is any task still in STAGING state and not in
@@ -1720,8 +1713,13 @@ void Slave::statusUpdate(const StatusUpdate& update)
   if (executor == NULL) {
     LOG(WARNING)  << "Could not find the executor for "
                   << "status update " << update;
-    stats.invalidStatusUpdates++;
+    stats.validStatusUpdates++;
 
+    // NOTE: We forward the update here because this update could be
+    // generated by the slave when the executor is unknown to it,
+    // e.g., killTask(), _runTask().
+    // TODO(vinod): Revisit these semantics when we disallow updates
+    // sent by executors that are unknown to the slave.
     statusUpdateManager->update(update, info.id())
       .onAny(defer(self(), &Slave::_statusUpdate, params::_1, update, None()));
 
@@ -1740,11 +1738,10 @@ void Slave::statusUpdate(const StatusUpdate& update)
   stats.validStatusUpdates++;
 
   executor->updateTaskState(status.task_id(), status.state());
-  executor->updates.put(status.task_id(), UUID::fromBytes(update.uuid()));
 
   // Handle the task appropriately if it's terminated.
   if (protobuf::isTerminalState(status.state())) {
-    executor->removeTask(status.task_id());
+    executor->terminateTask(status.task_id(), status.state());
 
     // Tell the isolator to update the resources.
     dispatch(isolator,
@@ -2152,10 +2149,10 @@ void Slave::executorTerminated(
         send(master, message);
       }
 
-      // Remove the executor if either there are no pending updates
-      // or the framework is terminating.
-      if (executor->updates.empty() ||
-          framework->state == Framework::TERMINATING) {
+      // Remove the executor if either the framework is terminating or
+      // there are no incomplete tasks.
+      if (framework->state == Framework::TERMINATING ||
+          !executor->incompleteTasks()) {
         remove(framework, executor);
       }
       break;
@@ -2186,8 +2183,8 @@ void Slave::remove(Framework* framework, Executor* executor)
   // care for pending updates when a framework is terminating
   // because the framework cannot ACK them.
   CHECK(executor->state == Executor::TERMINATED) << executor->state;
-  CHECK (executor->updates.empty() ||
-         framework->state == Framework::TERMINATING);
+  CHECK(framework->state == Framework::TERMINATING ||
+        !executor->incompleteTasks());
 
   // TODO(vinod): Move the responsibility of gc'ing to the
   // Executor struct.
@@ -2789,7 +2786,7 @@ Executor* Framework::getExecutor(const TaskID& taskId)
   foreachvalue (Executor* executor, executors) {
     if (executor->queuedTasks.contains(taskId) ||
         executor->launchedTasks.contains(taskId) ||
-        executor->updates.contains(taskId)) {
+        executor->terminatedTasks.contains(taskId)) {
       return executor;
     }
   }
@@ -2941,23 +2938,36 @@ Task* Executor::addTask(const TaskInfo& task)
 }
 
 
-void Executor::removeTask(const TaskID& taskId)
+void Executor::terminateTask(
+    const TaskID& taskId,
+    const mesos::TaskState& state)
 {
+  Task* task = NULL;
   // Remove the task if it's queued.
-  queuedTasks.erase(taskId);
-
-  // Update the resources if it's been launched.
-  if (launchedTasks.contains(taskId)) {
-    Task* task = launchedTasks[taskId];
+  if (queuedTasks.contains(taskId)) {
+    task = new Task(
+        protobuf::createTask(queuedTasks[taskId], state, id, frameworkId));
+  } else if (launchedTasks.contains(taskId)) {
+    // Update the resources if it's been launched.
+    task = launchedTasks[taskId];
     foreach (const Resource& resource, task->resources()) {
       resources -= resource;
     }
     launchedTasks.erase(taskId);
+  }
 
-    completedTasks.push_back(*task);
+  terminatedTasks[taskId] = CHECK_NOTNULL(task);
+}
 
-    delete task;
-  }
+
+void Executor::completeTask(const TaskID& taskId)
+{
+  CHECK(terminatedTasks.contains(taskId)) << "Unknown task " << taskId;
+
+  Task* task = terminatedTasks[taskId];
+  completedTasks.push_back(*task);
+  terminatedTasks.erase(taskId);
+  delete task;
 }
 
 
@@ -3001,16 +3011,14 @@ void Executor::recoverTask(const TaskState& state)
   // Read updates to get the latest state of the task.
   foreach (const StatusUpdate& update, state.updates) {
     updateTaskState(state.id, update.status().state());
-    updates.put(state.id, UUID::fromBytes(update.uuid()));
 
-    // Remove the task if it received a terminal update.
+    // Terminate the task if it received a terminal update.
     if (protobuf::isTerminalState(update.status().state())) {
-      removeTask(state.id);
+      terminateTask(state.id, update.status().state());
 
-      // If the terminal update has been acknowledged, remove it
-      // from pending tasks.
+      // If the terminal update has been acknowledged, remove it.
       if (state.acks.contains(UUID::fromBytes(update.uuid()))) {
-        updates.remove(state.id, UUID::fromBytes(update.uuid()));
+        completeTask(state.id);
       }
       break;
     }
@@ -3026,6 +3034,14 @@ void Executor::updateTaskState(const TaskID& taskId, mesos::TaskState state)
 }
 
 
+bool Executor::incompleteTasks()
+{
+  return !queuedTasks.empty() ||
+         !launchedTasks.empty() ||
+         !terminatedTasks.empty();
+}
+
+
 std::ostream& operator << (std::ostream& stream, Framework::State state) {
   switch (state) {
     case Framework::RUNNING:     return stream << "RUNNING";

http://git-wip-us.apache.org/repos/asf/mesos/blob/22e64506/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 26dc96e..5aba7ed 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -335,11 +335,15 @@ struct Executor
   ~Executor();
 
   Task* addTask(const TaskInfo& task);
-  void removeTask(const TaskID& taskId);
+  void terminateTask(const TaskID& taskId, const mesos::TaskState& state);
+  void completeTask(const TaskID& taskId);
   void checkpointTask(const TaskInfo& task);
   void recoverTask(const state::TaskState& state);
   void updateTaskState(const TaskID& taskId, TaskState state);
 
+  // Returns true if there are any queued/launched/terminated tasks.
+  bool incompleteTasks();
+
   enum State {
     REGISTERING,  // Executor is launched but not (re-)registered yet.
     RUNNING,      // Executor has (re-)registered.
@@ -367,13 +371,13 @@ struct Executor
 
   Resources resources; // Currently consumed resources.
 
-  hashmap<TaskID, TaskInfo> queuedTasks;
-  hashmap<TaskID, Task*> launchedTasks;
+  hashmap<TaskID, TaskInfo> queuedTasks; // Not yet launched.
+  hashmap<TaskID, Task*> launchedTasks;  // Running.
+  hashmap<TaskID, Task*> terminatedTasks; // Terminated but pending updates.
+  boost::circular_buffer<Task> completedTasks; // Terminated and updates acked.
 
   multihashmap<TaskID, UUID> updates; // Pending updates.
 
-  boost::circular_buffer<Task> completedTasks;
-
 private:
   Executor(const Executor&);              // No copying.
   Executor& operator = (const Executor&); // No assigning.