You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2014/09/19 03:24:59 UTC

[2/2] git commit: Hold on to unacknowledged terminal tasks in the Master.

Hold on to unacknowledged terminal tasks in the Master.

Review: https://reviews.apache.org/r/25567


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b3182fbd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b3182fbd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b3182fbd

Branch: refs/heads/master
Commit: b3182fbd138137b7864d7bc26cb45e67deaabac0
Parents: 3636c0e
Author: Benjamin Mahler <bm...@twitter.com>
Authored: Thu Sep 11 10:49:24 2014 -0700
Committer: Benjamin Mahler <bm...@twitter.com>
Committed: Thu Sep 18 18:24:51 2014 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 131 +++++++++++++++++++++++++++++----------------
 src/master/master.hpp |  11 +++-
 2 files changed, 94 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b3182fbd/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 52a7409..34803f7 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -651,7 +651,7 @@ void Master::finalize()
 
   // Remove the slaves.
   foreachvalue (Slave* slave, slaves.registered) {
-    // Remove tasks.
+    // Remove tasks, don't bother recovering resources.
     foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
       foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
         removeTask(task);
@@ -685,7 +685,8 @@ void Master::finalize()
   // allocator or the roles because it is unnecessary bookkeeping at
   // this point since we are shutting down.
   foreachvalue (Framework* framework, frameworks.registered) {
-    // Remove pending tasks from the framework.
+    // Remove pending tasks from the framework. Don't bother
+    // recovering the resources in the allocator.
     framework->pendingTasks.clear();
 
     // No tasks/executors/offers should remain since the slaves
@@ -2717,6 +2718,12 @@ void Master::statusUpdateAcknowledgement(
     return;
   }
 
+  Task* task = slave->getTask(frameworkId, taskId);
+
+  if (task != NULL && protobuf::isTerminalState(task->state())) {
+    removeTask(task);
+  }
+
   LOG(INFO) << "Forwarding status update acknowledgement "
             << UUID::fromBytes(uuid) << " for task " << taskId
             << " of framework " << frameworkId << " to slave " << *slave;
@@ -3192,8 +3199,7 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
   forward(update, pid, framework);
 
   // Lookup the task and see if we need to update anything locally.
-  const TaskStatus& status = update.status();
-  Task* task = slave->getTask(update.framework_id(), status.task_id());
+  Task* task = slave->getTask(update.framework_id(), update.status().task_id());
   if (task == NULL) {
     LOG(WARNING) << "Could not lookup task for status update " << update
                  << " from slave " << *slave;
@@ -3204,20 +3210,14 @@ void Master::statusUpdate(const StatusUpdate& update, const UPID& pid)
 
   LOG(INFO) << "Status update " << update << " from slave " << *slave;
 
-  // TODO(brenden) Consider wiping the `data` and `message` fields?
-  if (task->statuses_size() > 0 &&
-      task->statuses(task->statuses_size() - 1).state() == status.state()) {
-    task->mutable_statuses()->RemoveLast();
-  }
-  task->add_statuses()->CopyFrom(status);
-  task->set_state(status.state());
+  updateTask(task, update.status());
 
-  // Handle the task appropriately if it's terminated.
-  if (protobuf::isTerminalState(status.state())) {
+  // If the task is terminal and no acknowledgement is needed,
+  // then remove the task now.
+  if (protobuf::isTerminalState(task->state()) && pid == UPID()) {
     removeTask(task);
   }
 
-  stats.tasks[status.state()]++;
   stats.validStatusUpdates++;
   metrics.valid_status_updates++;
 }
@@ -3763,14 +3763,14 @@ void Master::reconcile(
   // missing from the slave. This could happen if the task was
   // dropped by the slave (e.g., slave exited before getting the
   // task or the task was launched while slave was in recovery).
-  // NOTE: copies are used because statusUpdate() modifies
-  //       slave->tasks.
+  // NOTE: copies are needed because removeTask modified slave->tasks.
   foreachkey (const FrameworkID& frameworkId, utils::copy(slave->tasks)) {
     foreachvalue (Task* task, utils::copy(slave->tasks[frameworkId])) {
       if (!slaveTasks.contains(task->framework_id(), task->task_id())) {
-        LOG(WARNING) << "Sending TASK_LOST for task " << task->task_id()
+        LOG(WARNING) << "Task " << task->task_id()
                      << " of framework " << task->framework_id()
-                     << " unknown to the slave " << *slave;
+                     << " unknown to the slave " << *slave
+                     << " during re-registration";
 
         const StatusUpdate& update = protobuf::createStatusUpdate(
             task->framework_id(),
@@ -3779,7 +3779,13 @@ void Master::reconcile(
             TASK_LOST,
             "Task is unknown to the slave");
 
-        statusUpdate(update, UPID());
+        updateTask(task, update.status());
+        removeTask(task);
+
+        Framework* framework = getFramework(frameworkId);
+        if (framework != NULL) {
+          forward(update, UPID(), framework);
+        }
       }
     }
   }
@@ -4079,8 +4085,7 @@ void Master::removeFramework(Slave* slave, Framework* framework)
 
   // Remove pointers to framework's tasks in slaves, and send status
   // updates.
-  // NOTE: A copy is used because statusUpdate() modifies
-  //       slave->tasks.
+  // NOTE: A copy is needed because removeTask modifies slave->tasks.
   foreachvalue (Task* task, utils::copy(slave->tasks[framework->id])) {
     // Remove tasks that belong to this framework.
     if (task->framework_id() == framework->id) {
@@ -4096,7 +4101,9 @@ void Master::removeFramework(Slave* slave, Framework* framework)
         (task->has_executor_id()
             ? Option<ExecutorID>(task->executor_id()) : None()));
 
-      statusUpdate(update, UPID());
+      updateTask(task, update.status());
+      removeTask(task);
+      forward(update, UPID(), framework);
     }
   }
 
@@ -4184,11 +4191,6 @@ void Master::readdSlave(
   }
 
   foreach (const Task& task, tasks) {
-    // Ignore tasks that have reached terminal state.
-    if (protobuf::isTerminalState(task.state())) {
-      continue;
-    }
-
     Task* t = new Task(task);
 
     // Add the task to the slave.
@@ -4211,7 +4213,10 @@ void Master::readdSlave(
                    << " running on slave " << *slave;
     }
 
-    resources[task.framework_id()] += task.resources();
+    // Terminal tasks do not consume resoures.
+    if (!protobuf::isTerminalState(task.state())) {
+      resources[task.framework_id()] += task.resources();
+    }
   }
 
   // Re-add completed tasks reported by the slave.
@@ -4269,8 +4274,7 @@ void Master::removeSlave(Slave* slave)
           (task->has_executor_id() ?
               Option<ExecutorID>(task->executor_id()) : None()));
 
-      task->add_statuses()->CopyFrom(update.status());
-      task->set_state(update.status().state());
+      updateTask(task, update.status());
       removeTask(task);
 
       updates.push_back(update);
@@ -4369,6 +4373,49 @@ void Master::_removeSlave(
 }
 
 
+void Master::updateTask(Task* task, const TaskStatus& status)
+{
+  // Out-of-order updates should not occur, however in case they
+  // do (e.g. MESOS-1799), prevent them here to ensure that the
+  // resource accounting is not affected.
+  if (protobuf::isTerminalState(task->state()) &&
+      !protobuf::isTerminalState(status.state())) {
+    LOG(ERROR) << "Ignoring out of order status update for task "
+               << task->task_id()
+               << " (" << task->state() << " -> " << status.state() << ")";
+    return;
+  }
+
+  // Once the task becomes terminal, we recover the resources.
+  if (!protobuf::isTerminalState(task->state()) &&
+      protobuf::isTerminalState(status.state())) {
+    allocator->resourcesRecovered(
+        task->framework_id(),
+        task->slave_id(),
+        task->resources(),
+        None());
+
+    switch (status.state()) {
+      case TASK_FINISHED: ++metrics.tasks_finished; break;
+      case TASK_FAILED:   ++metrics.tasks_failed;   break;
+      case TASK_KILLED:   ++metrics.tasks_killed;   break;
+      case TASK_LOST:     ++metrics.tasks_lost;     break;
+      default: break;
+    }
+  }
+
+  // TODO(brenden) Consider wiping the `data` and `message` fields?
+  if (task->statuses_size() > 0 &&
+      task->statuses(task->statuses_size() - 1).state() == status.state()) {
+    task->mutable_statuses()->RemoveLast();
+  }
+  task->add_statuses()->CopyFrom(status);
+  task->set_state(status.state());
+
+  stats.tasks[status.state()]++;
+}
+
+
 void Master::removeTask(Task* task)
 {
   CHECK_NOTNULL(task);
@@ -4381,6 +4428,14 @@ void Master::removeTask(Task* task)
                  << " of framework " << task->framework_id()
                  << " on slave " << *slave
                  << " in non-terminal state " << task->state();
+
+    // If the task is not terminal, then the resources have
+    // not yet been released.
+    allocator->resourcesRecovered(
+        task->framework_id(),
+        task->slave_id(),
+        task->resources(),
+        None());
   } else {
     LOG(INFO) << "Removing task " << task->task_id()
               << " with resources " << task->resources()
@@ -4397,22 +4452,6 @@ void Master::removeTask(Task* task)
   // Remove from slave.
   slave->removeTask(task);
 
-  // Tell the allocator about the recovered resources.
-  allocator->resourcesRecovered(
-      task->framework_id(),
-      task->slave_id(),
-      task->resources(),
-      None());
-
-  // Update the task state metric.
-  switch (task->state()) {
-    case TASK_FINISHED: ++metrics.tasks_finished; break;
-    case TASK_FAILED:   ++metrics.tasks_failed;   break;
-    case TASK_KILLED:   ++metrics.tasks_killed;   break;
-    case TASK_LOST:     ++metrics.tasks_lost;     break;
-    default: break;
-  }
-
   delete task;
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b3182fbd/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 80d7535..41da240 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -47,6 +47,7 @@
 #include <stout/multihashmap.hpp>
 #include <stout/option.hpp>
 
+#include "common/protobuf_utils.hpp"
 #include "common/type_utils.hpp"
 
 #include "files/files.hpp"
@@ -368,7 +369,11 @@ protected:
       const Filters& filters,
       const process::Future<std::list<process::Future<Option<Error> > > >& f);
 
-  // Remove a task and recover its resources.
+  // Transitions the task, and recovers resources if the task becomes
+  // terminal.
+  void updateTask(Task* task, const TaskStatus& status);
+
+  // Removes the task.
   void removeTask(Task* task);
 
   // Remove an executor and recover its resources.
@@ -908,7 +913,9 @@ struct Slave
 
     foreachkey (const FrameworkID& frameworkId, tasks) {
       foreachvalue (const Task* task, tasks.find(frameworkId)->second) {
-        used += task->resources();
+        if (!protobuf::isTerminalState(task->state())) {
+          used += task->resources();
+        }
       }
     }