You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/07/18 01:39:14 UTC

[1/2] mesos git commit: Added a method in the agent exposing an executor's allocated resources.

Repository: mesos
Updated Branches:
  refs/heads/master 92896871b -> f10ee7e63


Added a method in the agent exposing an executor's allocated resources.

This allows us to call a method to get the executor's allocated
resources, instead of call-sites having to each iterate over the
executor's queued and launched tasks.

Review: https://reviews.apache.org/r/60867/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/38a26842
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/38a26842
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/38a26842

Branch: refs/heads/master
Commit: 38a268428160373444187911d121dafc66023fc1
Parents: 9289687
Author: Andrei Budnik <ab...@mesosphere.com>
Authored: Mon Jul 17 17:41:17 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Mon Jul 17 17:41:17 2017 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp | 16 ++++++++++++++++
 src/slave/slave.hpp |  2 ++
 2 files changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/38a26842/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index adbe65f..2f25c0c 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -7836,6 +7836,22 @@ Option<TaskGroupInfo> Executor::getQueuedTaskGroup(const TaskID& taskId)
 }
 
 
+Resources Executor::allocatedResources() const
+{
+  Resources allocatedResources = info.resources();
+
+  foreachvalue (const TaskInfo& task, queuedTasks) {
+    allocatedResources += task.resources();
+  }
+
+  foreachvalue (const Task* task, launchedTasks) {
+    allocatedResources += task->resources();
+  }
+
+  return allocatedResources;
+}
+
+
 map<string, string> executorEnvironment(
     const Flags& flags,
     const ExecutorInfo& executorInfo,

http://git-wip-us.apache.org/repos/asf/mesos/blob/38a26842/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 8bb03ec..094f464 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -749,6 +749,8 @@ public:
   // Returns the task group associated with the task.
   Option<TaskGroupInfo> getQueuedTaskGroup(const TaskID& taskId);
 
+  Resources allocatedResources() const;
+
   enum State
   {
     REGISTERING,  // Executor is launched but not (re-)registered yet.


[2/2] mesos git commit: Removed Executor::resources usage in the agent.

Posted by bm...@apache.org.
Removed Executor::resources usage in the agent.

We have now added Executor::allocatedResources() which obviates the
need for Executor::resources. Note that some call-site accesses of
Executor::resources were forgetting to include the queued tasks! So
this patch also fixes these bugs:

(1) The metrics / endpoint information about allocated resources
    were omitting the queued tasks.

(2) There was a race where the `Slave::__run` calls
    `containerizer->update` including the queued tasks, and immediately
    afterwards if `Slave::_statusUpdate` runs and calls
    `containerizer->update` (without the queued tasks!), the executor
    becomes sized below what it should be!

Review: https://reviews.apache.org/r/60907/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f10ee7e6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f10ee7e6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f10ee7e6

Branch: refs/heads/master
Commit: f10ee7e636fcb805c0cd78b428e705e30860e4ee
Parents: 38a2684
Author: Andrei Budnik <ab...@mesosphere.com>
Authored: Mon Jul 17 17:43:47 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Mon Jul 17 17:43:47 2017 -0700

----------------------------------------------------------------------
 src/slave/http.cpp  |  2 +-
 src/slave/slave.cpp | 77 ++++++++++++------------------------------------
 src/slave/slave.hpp |  3 --
 3 files changed, 20 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f10ee7e6/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 60640e5..5c23bfe 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -185,7 +185,7 @@ struct ExecutorWriter
     writer->field("source", executor_->info.source());
     writer->field("container", executor_->containerId.value());
     writer->field("directory", executor_->directory);
-    writer->field("resources", executor_->resources);
+    writer->field("resources", executor_->allocatedResources());
 
     // Resources may be empty for command executors.
     if (!executor_->info.resources().empty()) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f10ee7e6/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 2f25c0c..7381530 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2416,17 +2416,9 @@ void Slave::__run(
       LOG(INFO) << "Queued " << taskOrTaskGroup(task, taskGroup)
                 << " for executor " << *executor;
 
-      // Update the resource limits for the container. Note that the
-      // resource limits include the currently queued tasks because we
-      // want the container to have enough resources to hold the
-      // upcoming tasks.
-      Resources resources = executor->resources;
-
-      foreachvalue (const TaskInfo& _task, executor->queuedTasks) {
-        resources += _task.resources();
-      }
-
-      containerizer->update(executor->containerId, resources)
+      containerizer->update(
+          executor->containerId,
+          executor->allocatedResources())
         .onAny(defer(self(),
                      &Self::___run,
                      lambda::_1,
@@ -3790,16 +3782,6 @@ void Slave::subscribe(
             None());
       }
 
-      // Update the resource limits for the container. Note that the
-      // resource limits include the currently queued tasks because we
-      // want the container to have enough resources to hold the
-      // upcoming tasks.
-      Resources resources = executor->resources;
-
-      foreachvalue (const TaskInfo& task, executor->queuedTasks) {
-        resources += task.resources();
-      }
-
       // We maintain a copy of the tasks in `queuedTaskGroups` also in
       // `queuedTasks`. Hence, we need to ensure that we don't send the same
       // tasks to the executor twice.
@@ -3819,7 +3801,9 @@ void Slave::subscribe(
         }
       }
 
-      containerizer->update(executor->containerId, resources)
+      containerizer->update(
+          executor->containerId,
+          executor->allocatedResources())
         .onAny(defer(self(),
                      &Self::___run,
                      lambda::_1,
@@ -4012,16 +3996,6 @@ void Slave::registerExecutor(
       message.mutable_slave_info()->MergeFrom(info);
       executor->send(message);
 
-      // Update the resource limits for the container. Note that the
-      // resource limits include the currently queued tasks because we
-      // want the container to have enough resources to hold the
-      // upcoming tasks.
-      Resources resources = executor->resources;
-
-      foreachvalue (const TaskInfo& task, executor->queuedTasks) {
-        resources += task.resources();
-      }
-
       // We maintain a copy of the tasks in `queuedTaskGroups` also in
       // `queuedTasks`. Hence, we need to ensure that we don't send the same
       // tasks to the executor twice.
@@ -4041,7 +4015,9 @@ void Slave::registerExecutor(
         }
       }
 
-      containerizer->update(executor->containerId, resources)
+      containerizer->update(
+          executor->containerId,
+          executor->allocatedResources())
         .onAny(defer(self(),
                      &Self::___run,
                      lambda::_1,
@@ -4179,7 +4155,9 @@ void Slave::reregisterExecutor(
       }
 
       // Tell the containerizer to update the resources.
-      containerizer->update(executor->containerId, executor->resources)
+      containerizer->update(
+          executor->containerId,
+          executor->allocatedResources())
         .onAny(defer(self(),
                      &Self::_reregisterExecutor,
                      lambda::_1,
@@ -4623,7 +4601,7 @@ void Slave::_statusUpdate(
     // have been updated before sending the status update. Note that
     // duplicate terminal updates are not possible here because they
     // lead to an error from `Executor::updateTaskState`.
-    containerizer->update(executor->containerId, executor->resources)
+    containerizer->update(executor->containerId, executor->allocatedResources())
       .onAny(defer(self(),
                    &Slave::__statusUpdate,
                    lambda::_1,
@@ -6445,9 +6423,8 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
     // calculating the available oversubscribed resources to offer.
     Resources oversubscribed;
     foreachvalue (Framework* framework, frameworks) {
-      foreachvalue (Executor* executor, framework->executors) {
-        oversubscribed += unallocated(executor->resources.revocable());
-      }
+      oversubscribed += unallocated(
+          framework->allocatedResources().revocable());
     }
 
     // Add oversubscribable resources to the total.
@@ -6647,7 +6624,7 @@ Future<ResourceUsage> Slave::usage()
 
       ResourceUsage::Executor* entry = usage->add_executors();
       entry->mutable_executor_info()->CopyFrom(executor->info);
-      entry->mutable_allocated()->CopyFrom(executor->resources);
+      entry->mutable_allocated()->CopyFrom(executor->allocatedResources());
       entry->mutable_container_id()->CopyFrom(executor->containerId);
 
       // We include non-terminal tasks in ResourceUsage.
@@ -7012,9 +6989,7 @@ double Slave::_resources_used(const string& name)
   Resources used;
 
   foreachvalue (Framework* framework, frameworks) {
-    foreachvalue (Executor* executor, framework->executors) {
-      used += executor->resources.nonRevocable();
-    }
+    used += framework->allocatedResources().nonRevocable();
   }
 
   return used.get<Value::Scalar>(name).getOrElse(Value::Scalar()).value();
@@ -7056,9 +7031,7 @@ double Slave::_resources_revocable_used(const string& name)
   Resources used;
 
   foreachvalue (Framework* framework, frameworks) {
-    foreachvalue (Executor* executor, framework->executors) {
-      used += executor->resources.revocable();
-    }
+    used += framework->allocatedResources().revocable();
   }
 
   return used.get<Value::Scalar>(name).getOrElse(Value::Scalar()).value();
@@ -7484,9 +7457,7 @@ Resources Framework::allocatedResources() const
   Resources allocated;
 
   foreachvalue (const Executor* executor, executors) {
-    // TODO(abudnik): Currently `Executor::resources` does not include
-    // the executor's queued tasks!
-    allocated += executor->resources;
+    allocated += executor->allocatedResources();
   }
 
   hashset<ExecutorID> pendingExecutors;
@@ -7530,7 +7501,6 @@ Executor::Executor(
     checkpoint(_checkpoint),
     http(None()),
     pid(None()),
-    resources(_info.resources()),
     completedTasks(MAX_COMPLETED_TASKS_PER_EXECUTOR)
 {
   CHECK_NOTNULL(slave);
@@ -7579,8 +7549,6 @@ Task* Executor::addTask(const TaskInfo& task)
 
   launchedTasks[task.task_id()] = t;
 
-  resources += task.resources();
-
   return t;
 }
 
@@ -7660,12 +7628,6 @@ void Executor::recoverTask(const TaskState& state, bool recheckpointTask)
 
   launchedTasks[state.id] = task;
 
-  // NOTE: Since some tasks might have been terminated when the
-  // slave was down, the executor resources we capture here is an
-  // upper-bound. The actual resources needed (for live tasks) by
-  // the isolator will be calculated when the executor re-registers.
-  resources += state.info->resources();
-
   // Read updates to get the latest state of the task.
   foreach (const StatusUpdate& update, state.updates) {
     Try<Nothing> updated = updateTaskState(update.status());
@@ -7754,7 +7716,6 @@ Try<Nothing> Executor::updateTaskState(const TaskStatus& status)
     task = launchedTasks.at(status.task_id());
 
     if (terminal) {
-      resources -= task->resources(); // Release the resources.
       launchedTasks.erase(taskId);
     }
   } else if (terminatedTasks.contains(taskId)) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/f10ee7e6/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 094f464..1fe93da 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -795,9 +795,6 @@ public:
   Option<HttpConnection> http;
   Option<process::UPID> pid;
 
-  // Currently consumed resources.
-  Resources resources;
-
   // Tasks can be found in one of the following four data structures:
   //
   // TODO(bmahler): Make these private to enforce that the task