You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by qi...@apache.org on 2020/03/20 09:03:08 UTC
[mesos] 11/21: Set resource limits when updating executor container.
This is an automated email from the ASF dual-hosted git repository.
qianzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 1bc52e7fd261e1092a5b9618e44977a936cb9d80
Author: Qian Zhang <zh...@gmail.com>
AuthorDate: Fri Jan 3 16:31:53 2020 +0800
Set resource limits when updating executor container.
Review: https://reviews.apache.org/r/71952
---
src/slave/slave.cpp | 135 ++++++++++++++++++++++++++++++++++++++--------------
src/slave/slave.hpp | 6 ++-
2 files changed, 104 insertions(+), 37 deletions(-)
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index f214560..6a48023 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3347,15 +3347,23 @@ void Slave::__run(
<< " for executor " << *executor;
const ContainerID& containerId = executor->containerId;
- const Resources& resources = executor->allocatedResources();
-
- publishResources(containerId, resources)
- .then(defer(self(), [this, containerId, resources] {
- // NOTE: The executor struct could have been removed before
- // containerizer update, so we use the captured container ID and
- // resources here. If this happens, the containerizer would simply
- // skip updating a destroyed container.
- return containerizer->update(containerId, resources);
+ const Resources& resourceRequests = executor->allocatedResources();
+ const google::protobuf::Map<string, Value::Scalar>& resourceLimits =
+ computeExecutorLimits(
+ executor->info.resources(),
+ executor->queuedTasks.values(),
+ executor->launchedTasks.values());
+
+ publishResources(containerId, resourceRequests)
+ .then(defer(
+ self(),
+ [this, containerId, resourceRequests, resourceLimits] {
+ // NOTE: The executor struct could have been removed before
+ // containerizer update, so we use the captured container ID,
+ // resource requests and limits here. If this happens, the
+ // containerizer would simply skip updating a destroyed container.
+ return containerizer->update(
+ containerId, resourceRequests, resourceLimits);
}))
.onAny(defer(self(),
&Self::___run,
@@ -5329,7 +5337,12 @@ void Slave::subscribe(
}
const ContainerID& containerId = executor->containerId;
- const Resources& resources = executor->allocatedResources();
+ const Resources& resourceRequests = executor->allocatedResources();
+ const google::protobuf::Map<string, Value::Scalar>& resourceLimits =
+ computeExecutorLimits(
+ executor->info.resources(),
+ executor->queuedTasks.values(),
+ executor->launchedTasks.values());
Future<Nothing> resourcesPublished;
if (executor->queuedTasks.empty()) {
@@ -5343,16 +5356,19 @@ void Slave::subscribe(
// after use. See comments in `publishResources` for details.
resourcesPublished = Nothing();
} else {
- resourcesPublished = publishResources(containerId, resources);
+ resourcesPublished = publishResources(containerId, resourceRequests);
}
resourcesPublished
- .then(defer(self(), [this, containerId, resources] {
- // NOTE: The executor struct could have been removed before
- // containerizer update, so we use the captured container ID and
- // resources here. If this happens, the containerizer would simply
- // skip updating a destroyed container.
- return containerizer->update(containerId, resources);
+ .then(defer(
+ self(),
+ [this, containerId, resourceRequests, resourceLimits] {
+ // NOTE: The executor struct could have been removed before
+ // containerizer update, so we use the captured container ID,
+ // resource requests and limits here. If this happens, the
+ // containerizer would simply skip updating a destroyed container.
+ return containerizer->update(
+ containerId, resourceRequests, resourceLimits);
}))
.onAny(defer(self(),
&Self::___run,
@@ -5506,15 +5522,23 @@ void Slave::registerExecutor(
}
const ContainerID& containerId = executor->containerId;
- const Resources& resources = executor->allocatedResources();
-
- publishResources(containerId, resources)
- .then(defer(self(), [this, containerId, resources] {
- // NOTE: The executor struct could have been removed before
- // containerizer update, so we use the captured container ID and
- // resources here. If this happens, the containerizer would simply
- // skip updating a destroyed container.
- return containerizer->update(containerId, resources);
+ const Resources& resourceRequests = executor->allocatedResources();
+ const google::protobuf::Map<string, Value::Scalar>& resourceLimits =
+ computeExecutorLimits(
+ executor->info.resources(),
+ executor->queuedTasks.values(),
+ executor->launchedTasks.values());
+
+ publishResources(containerId, resourceRequests)
+ .then(defer(
+ self(),
+ [this, containerId, resourceRequests, resourceLimits] {
+ // NOTE: The executor struct could have been removed before
+ // containerizer update, so we use the captured container ID,
+ // resource requests and limits here. If this happens, the
+ // containerizer would simply skip updating a destroyed container.
+ return containerizer->update(
+ containerId, resourceRequests, resourceLimits);
}))
.onAny(defer(self(),
&Self::___run,
@@ -5655,7 +5679,11 @@ void Slave::reregisterExecutor(
// Tell the containerizer to update the resources.
containerizer->update(
executor->containerId,
- executor->allocatedResources())
+ executor->allocatedResources(),
+ computeExecutorLimits(
+ executor->info.resources(),
+ executor->queuedTasks.values(),
+ executor->launchedTasks.values()))
.onAny(defer(self(),
&Self::_reregisterExecutor,
lambda::_1,
@@ -6199,7 +6227,13 @@ void Slave::_statusUpdate(
// have been updated before sending the status update. Note that
// duplicate terminal updates are not possible here because they
// lead to an error from `Executor::updateTaskState`.
- containerizer->update(executor->containerId, executor->allocatedResources())
+ containerizer->update(
+ executor->containerId,
+ executor->allocatedResources(),
+ computeExecutorLimits(
+ executor->info.resources(),
+ executor->queuedTasks.values(),
+ executor->launchedTasks.values()))
.onAny(defer(self(),
&Slave::__statusUpdate,
lambda::_1,
@@ -9980,17 +10014,46 @@ void Slave::initializeResourceProviderManager(
google::protobuf::Map<string, Value::Scalar> Slave::computeExecutorLimits(
const Resources& executorResources,
- const vector<TaskInfo>& tasks) const
+ const vector<TaskInfo>& taskInfos,
+ const vector<Task*>& tasks) const
{
Option<Value::Scalar> executorCpuLimit, executorMemLimit;
Value::Scalar cpuRequest, memRequest;
- foreach (const TaskInfo& task, tasks) {
+ foreach (const TaskInfo& taskInfo, taskInfos) {
+ // Count the task's CPU limit into the executor's CPU limit.
+ if (taskInfo.limits().count("cpus")) {
+ setLimit(executorCpuLimit, taskInfo.limits().at("cpus"));
+ } else {
+ Option<Value::Scalar> taskCpus =
+ Resources(taskInfo.resources()).get<Value::Scalar>("cpus");
+
+ if (taskCpus.isSome()) {
+ cpuRequest += taskCpus.get();
+ }
+ }
+
+ // Count the task's memory limit into the executor's memory limit.
+ if (taskInfo.limits().count("mem")) {
+ setLimit(executorMemLimit, taskInfo.limits().at("mem"));
+ } else {
+ Option<Value::Scalar> taskMem =
+ Resources(taskInfo.resources()).get<Value::Scalar>("mem");
+
+ if (taskMem.isSome()) {
+ memRequest += taskMem.get();
+ }
+ }
+ }
+
+ foreach (const Task* task, tasks) {
+ CHECK_NOTNULL(task);
+
// Count the task's CPU limit into the executor's CPU limit.
- if (task.limits().count("cpus")) {
- setLimit(executorCpuLimit, task.limits().at("cpus"));
+ if (task->limits().count("cpus")) {
+ setLimit(executorCpuLimit, task->limits().at("cpus"));
} else {
Option<Value::Scalar> taskCpus =
- Resources(task.resources()).get<Value::Scalar>("cpus");
+ Resources(task->resources()).get<Value::Scalar>("cpus");
if (taskCpus.isSome()) {
cpuRequest += taskCpus.get();
@@ -9998,11 +10061,11 @@ google::protobuf::Map<string, Value::Scalar> Slave::computeExecutorLimits(
}
// Count the task's memory limit into the executor's memory limit.
- if (task.limits().count("mem")) {
- setLimit(executorMemLimit, task.limits().at("mem"));
+ if (task->limits().count("mem")) {
+ setLimit(executorMemLimit, task->limits().at("mem"));
} else {
Option<Value::Scalar> taskMem =
- Resources(task.resources()).get<Value::Scalar>("mem");
+ Resources(task->resources()).get<Value::Scalar>("mem");
if (taskMem.isSome()) {
memRequest += taskMem.get();
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index a5a367e..d7e65e0 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -779,9 +779,13 @@ private:
const Flags& flags,
const SlaveID& slaveId);
+ // This function is used to compute limits for executors before they
+ // are launched as well as when updating running executors, so we must
+ // accept both `TaskInfo` and `Task` types to handle both cases.
google::protobuf::Map<std::string, Value::Scalar> computeExecutorLimits(
const Resources& executorResources,
- const std::vector<TaskInfo>& tasks) const;
+ const std::vector<TaskInfo>& taskInfos,
+ const std::vector<Task*>& tasks = {}) const;
protobuf::master::Capabilities requiredMasterCapabilities;