You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by qi...@apache.org on 2020/03/20 09:03:08 UTC

[mesos] 11/21: Set resource limits when updating executor container.

This is an automated email from the ASF dual-hosted git repository.

qianzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 1bc52e7fd261e1092a5b9618e44977a936cb9d80
Author: Qian Zhang <zh...@gmail.com>
AuthorDate: Fri Jan 3 16:31:53 2020 +0800

    Set resource limits when updating executor container.
    
    Review: https://reviews.apache.org/r/71952
---
 src/slave/slave.cpp | 135 ++++++++++++++++++++++++++++++++++++++--------------
 src/slave/slave.hpp |   6 ++-
 2 files changed, 104 insertions(+), 37 deletions(-)

diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index f214560..6a48023 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3347,15 +3347,23 @@ void Slave::__run(
                 << " for executor " << *executor;
 
       const ContainerID& containerId = executor->containerId;
-      const Resources& resources = executor->allocatedResources();
-
-      publishResources(containerId, resources)
-        .then(defer(self(), [this, containerId, resources] {
-          // NOTE: The executor struct could have been removed before
-          // containerizer update, so we use the captured container ID and
-          // resources here. If this happens, the containerizer would simply
-          // skip updating a destroyed container.
-          return containerizer->update(containerId, resources);
+      const Resources& resourceRequests = executor->allocatedResources();
+      const google::protobuf::Map<string, Value::Scalar>& resourceLimits =
+        computeExecutorLimits(
+            executor->info.resources(),
+            executor->queuedTasks.values(),
+            executor->launchedTasks.values());
+
+      publishResources(containerId, resourceRequests)
+        .then(defer(
+            self(),
+            [this, containerId, resourceRequests, resourceLimits] {
+              // NOTE: The executor struct could have been removed before
+              // containerizer update, so we use the captured container ID,
+              // resource requests and limits here. If this happens, the
+              // containerizer would simply skip updating a destroyed container.
+              return containerizer->update(
+                  containerId, resourceRequests, resourceLimits);
         }))
         .onAny(defer(self(),
                      &Self::___run,
@@ -5329,7 +5337,12 @@ void Slave::subscribe(
       }
 
       const ContainerID& containerId = executor->containerId;
-      const Resources& resources = executor->allocatedResources();
+      const Resources& resourceRequests = executor->allocatedResources();
+      const google::protobuf::Map<string, Value::Scalar>& resourceLimits =
+        computeExecutorLimits(
+            executor->info.resources(),
+            executor->queuedTasks.values(),
+            executor->launchedTasks.values());
 
       Future<Nothing> resourcesPublished;
       if (executor->queuedTasks.empty()) {
@@ -5343,16 +5356,19 @@ void Slave::subscribe(
         // after use. See comments in `publishResources` for details.
         resourcesPublished = Nothing();
       } else {
-        resourcesPublished = publishResources(containerId, resources);
+        resourcesPublished = publishResources(containerId, resourceRequests);
       }
 
       resourcesPublished
-        .then(defer(self(), [this, containerId, resources] {
-          // NOTE: The executor struct could have been removed before
-          // containerizer update, so we use the captured container ID and
-          // resources here. If this happens, the containerizer would simply
-          // skip updating a destroyed container.
-          return containerizer->update(containerId, resources);
+        .then(defer(
+            self(),
+            [this, containerId, resourceRequests, resourceLimits] {
+              // NOTE: The executor struct could have been removed before
+              // containerizer update, so we use the captured container ID,
+              // resource requests and limits here. If this happens, the
+              // containerizer would simply skip updating a destroyed container.
+              return containerizer->update(
+                  containerId, resourceRequests, resourceLimits);
         }))
         .onAny(defer(self(),
                      &Self::___run,
@@ -5506,15 +5522,23 @@ void Slave::registerExecutor(
       }
 
       const ContainerID& containerId = executor->containerId;
-      const Resources& resources = executor->allocatedResources();
-
-      publishResources(containerId, resources)
-        .then(defer(self(), [this, containerId, resources] {
-          // NOTE: The executor struct could have been removed before
-          // containerizer update, so we use the captured container ID and
-          // resources here. If this happens, the containerizer would simply
-          // skip updating a destroyed container.
-          return containerizer->update(containerId, resources);
+      const Resources& resourceRequests = executor->allocatedResources();
+      const google::protobuf::Map<string, Value::Scalar>& resourceLimits =
+        computeExecutorLimits(
+            executor->info.resources(),
+            executor->queuedTasks.values(),
+            executor->launchedTasks.values());
+
+      publishResources(containerId, resourceRequests)
+        .then(defer(
+            self(),
+            [this, containerId, resourceRequests, resourceLimits] {
+              // NOTE: The executor struct could have been removed before
+              // containerizer update, so we use the captured container ID,
+              // resource requests and limits here. If this happens, the
+              // containerizer would simply skip updating a destroyed container.
+              return containerizer->update(
+                  containerId, resourceRequests, resourceLimits);
         }))
         .onAny(defer(self(),
                      &Self::___run,
@@ -5655,7 +5679,11 @@ void Slave::reregisterExecutor(
       // Tell the containerizer to update the resources.
       containerizer->update(
           executor->containerId,
-          executor->allocatedResources())
+          executor->allocatedResources(),
+          computeExecutorLimits(
+              executor->info.resources(),
+              executor->queuedTasks.values(),
+              executor->launchedTasks.values()))
         .onAny(defer(self(),
                      &Self::_reregisterExecutor,
                      lambda::_1,
@@ -6199,7 +6227,13 @@ void Slave::_statusUpdate(
     // have been updated before sending the status update. Note that
     // duplicate terminal updates are not possible here because they
     // lead to an error from `Executor::updateTaskState`.
-    containerizer->update(executor->containerId, executor->allocatedResources())
+    containerizer->update(
+        executor->containerId,
+        executor->allocatedResources(),
+        computeExecutorLimits(
+            executor->info.resources(),
+            executor->queuedTasks.values(),
+            executor->launchedTasks.values()))
       .onAny(defer(self(),
                    &Slave::__statusUpdate,
                    lambda::_1,
@@ -9980,17 +10014,46 @@ void Slave::initializeResourceProviderManager(
 
 google::protobuf::Map<string, Value::Scalar> Slave::computeExecutorLimits(
     const Resources& executorResources,
-    const vector<TaskInfo>& tasks) const
+    const vector<TaskInfo>& taskInfos,
+    const vector<Task*>& tasks) const
 {
   Option<Value::Scalar> executorCpuLimit, executorMemLimit;
   Value::Scalar cpuRequest, memRequest;
-  foreach (const TaskInfo& task, tasks) {
+  foreach (const TaskInfo& taskInfo, taskInfos) {
+    // Count the task's CPU limit into the executor's CPU limit.
+    if (taskInfo.limits().count("cpus")) {
+      setLimit(executorCpuLimit, taskInfo.limits().at("cpus"));
+    } else {
+      Option<Value::Scalar> taskCpus =
+        Resources(taskInfo.resources()).get<Value::Scalar>("cpus");
+
+      if (taskCpus.isSome()) {
+        cpuRequest += taskCpus.get();
+      }
+    }
+
+    // Count the task's memory limit into the executor's memory limit.
+    if (taskInfo.limits().count("mem")) {
+      setLimit(executorMemLimit, taskInfo.limits().at("mem"));
+    } else {
+      Option<Value::Scalar> taskMem =
+        Resources(taskInfo.resources()).get<Value::Scalar>("mem");
+
+      if (taskMem.isSome()) {
+        memRequest += taskMem.get();
+      }
+    }
+  }
+
+  foreach (const Task* task, tasks) {
+    CHECK_NOTNULL(task);
+
     // Count the task's CPU limit into the executor's CPU limit.
-    if (task.limits().count("cpus")) {
-      setLimit(executorCpuLimit, task.limits().at("cpus"));
+    if (task->limits().count("cpus")) {
+      setLimit(executorCpuLimit, task->limits().at("cpus"));
     } else {
       Option<Value::Scalar> taskCpus =
-        Resources(task.resources()).get<Value::Scalar>("cpus");
+        Resources(task->resources()).get<Value::Scalar>("cpus");
 
       if (taskCpus.isSome()) {
         cpuRequest += taskCpus.get();
@@ -9998,11 +10061,11 @@ google::protobuf::Map<string, Value::Scalar> Slave::computeExecutorLimits(
     }
 
     // Count the task's memory limit into the executor's memory limit.
-    if (task.limits().count("mem")) {
-      setLimit(executorMemLimit, task.limits().at("mem"));
+    if (task->limits().count("mem")) {
+      setLimit(executorMemLimit, task->limits().at("mem"));
     } else {
       Option<Value::Scalar> taskMem =
-        Resources(task.resources()).get<Value::Scalar>("mem");
+        Resources(task->resources()).get<Value::Scalar>("mem");
 
       if (taskMem.isSome()) {
         memRequest += taskMem.get();
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index a5a367e..d7e65e0 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -779,9 +779,13 @@ private:
       const Flags& flags,
       const SlaveID& slaveId);
 
+  // This function is used to compute limits for executors before they
+  // are launched as well as when updating running executors, so we must
+  // accept both `TaskInfo` and `Task` types to handle both cases.
   google::protobuf::Map<std::string, Value::Scalar> computeExecutorLimits(
       const Resources& executorResources,
-      const std::vector<TaskInfo>& tasks) const;
+      const std::vector<TaskInfo>& taskInfos,
+      const std::vector<Task*>& tasks = {}) const;
 
   protobuf::master::Capabilities requiredMasterCapabilities;