You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bm...@apache.org on 2017/07/18 01:39:14 UTC
[1/2] mesos git commit: Added a method in the agent exposing an
executor's allocated resources.
Repository: mesos
Updated Branches:
refs/heads/master 92896871b -> f10ee7e63
Added a method in the agent exposing an executor's allocated resources.
This allows us to call a method to get the executor's allocated
resources, instead of call-sites having to each iterate over the
executor's queued and launched tasks.
Review: https://reviews.apache.org/r/60867/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/38a26842
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/38a26842
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/38a26842
Branch: refs/heads/master
Commit: 38a268428160373444187911d121dafc66023fc1
Parents: 9289687
Author: Andrei Budnik <ab...@mesosphere.com>
Authored: Mon Jul 17 17:41:17 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Mon Jul 17 17:41:17 2017 -0700
----------------------------------------------------------------------
src/slave/slave.cpp | 16 ++++++++++++++++
src/slave/slave.hpp | 2 ++
2 files changed, 18 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/38a26842/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index adbe65f..2f25c0c 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -7836,6 +7836,22 @@ Option<TaskGroupInfo> Executor::getQueuedTaskGroup(const TaskID& taskId)
}
+Resources Executor::allocatedResources() const
+{
+ Resources allocatedResources = info.resources();
+
+ foreachvalue (const TaskInfo& task, queuedTasks) {
+ allocatedResources += task.resources();
+ }
+
+ foreachvalue (const Task* task, launchedTasks) {
+ allocatedResources += task->resources();
+ }
+
+ return allocatedResources;
+}
+
+
map<string, string> executorEnvironment(
const Flags& flags,
const ExecutorInfo& executorInfo,
http://git-wip-us.apache.org/repos/asf/mesos/blob/38a26842/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 8bb03ec..094f464 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -749,6 +749,8 @@ public:
// Returns the task group associated with the task.
Option<TaskGroupInfo> getQueuedTaskGroup(const TaskID& taskId);
+ Resources allocatedResources() const;
+
enum State
{
REGISTERING, // Executor is launched but not (re-)registered yet.
[2/2] mesos git commit: Removed Executor::resources usage in the
agent.
Posted by bm...@apache.org.
Removed Executor::resources usage in the agent.
We have now added Executor::allocatedResources() which obviates the
need for Executor::resources. Note that some call-site accesses of
Executor::resources were forgetting to include the queued tasks! So
this patch also fixes these bugs:
(1) The metrics / endpoint information about allocated resources
were omitting the queued tasks.
(2) There was a race where the `Slave::__run` calls
`containerizer->update` including the queued tasks, and immediately
afterwards if `Slave::_statusUpdate` runs and calls
`containerizer->update` (without the queued tasks!), the executor
becomes sized below what it should be!
Review: https://reviews.apache.org/r/60907/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f10ee7e6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f10ee7e6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f10ee7e6
Branch: refs/heads/master
Commit: f10ee7e636fcb805c0cd78b428e705e30860e4ee
Parents: 38a2684
Author: Andrei Budnik <ab...@mesosphere.com>
Authored: Mon Jul 17 17:43:47 2017 -0700
Committer: Benjamin Mahler <bm...@apache.org>
Committed: Mon Jul 17 17:43:47 2017 -0700
----------------------------------------------------------------------
src/slave/http.cpp | 2 +-
src/slave/slave.cpp | 77 ++++++++++++------------------------------------
src/slave/slave.hpp | 3 --
3 files changed, 20 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f10ee7e6/src/slave/http.cpp
----------------------------------------------------------------------
diff --git a/src/slave/http.cpp b/src/slave/http.cpp
index 60640e5..5c23bfe 100644
--- a/src/slave/http.cpp
+++ b/src/slave/http.cpp
@@ -185,7 +185,7 @@ struct ExecutorWriter
writer->field("source", executor_->info.source());
writer->field("container", executor_->containerId.value());
writer->field("directory", executor_->directory);
- writer->field("resources", executor_->resources);
+ writer->field("resources", executor_->allocatedResources());
// Resources may be empty for command executors.
if (!executor_->info.resources().empty()) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f10ee7e6/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 2f25c0c..7381530 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -2416,17 +2416,9 @@ void Slave::__run(
LOG(INFO) << "Queued " << taskOrTaskGroup(task, taskGroup)
<< " for executor " << *executor;
- // Update the resource limits for the container. Note that the
- // resource limits include the currently queued tasks because we
- // want the container to have enough resources to hold the
- // upcoming tasks.
- Resources resources = executor->resources;
-
- foreachvalue (const TaskInfo& _task, executor->queuedTasks) {
- resources += _task.resources();
- }
-
- containerizer->update(executor->containerId, resources)
+ containerizer->update(
+ executor->containerId,
+ executor->allocatedResources())
.onAny(defer(self(),
&Self::___run,
lambda::_1,
@@ -3790,16 +3782,6 @@ void Slave::subscribe(
None());
}
- // Update the resource limits for the container. Note that the
- // resource limits include the currently queued tasks because we
- // want the container to have enough resources to hold the
- // upcoming tasks.
- Resources resources = executor->resources;
-
- foreachvalue (const TaskInfo& task, executor->queuedTasks) {
- resources += task.resources();
- }
-
// We maintain a copy of the tasks in `queuedTaskGroups` also in
// `queuedTasks`. Hence, we need to ensure that we don't send the same
// tasks to the executor twice.
@@ -3819,7 +3801,9 @@ void Slave::subscribe(
}
}
- containerizer->update(executor->containerId, resources)
+ containerizer->update(
+ executor->containerId,
+ executor->allocatedResources())
.onAny(defer(self(),
&Self::___run,
lambda::_1,
@@ -4012,16 +3996,6 @@ void Slave::registerExecutor(
message.mutable_slave_info()->MergeFrom(info);
executor->send(message);
- // Update the resource limits for the container. Note that the
- // resource limits include the currently queued tasks because we
- // want the container to have enough resources to hold the
- // upcoming tasks.
- Resources resources = executor->resources;
-
- foreachvalue (const TaskInfo& task, executor->queuedTasks) {
- resources += task.resources();
- }
-
// We maintain a copy of the tasks in `queuedTaskGroups` also in
// `queuedTasks`. Hence, we need to ensure that we don't send the same
// tasks to the executor twice.
@@ -4041,7 +4015,9 @@ void Slave::registerExecutor(
}
}
- containerizer->update(executor->containerId, resources)
+ containerizer->update(
+ executor->containerId,
+ executor->allocatedResources())
.onAny(defer(self(),
&Self::___run,
lambda::_1,
@@ -4179,7 +4155,9 @@ void Slave::reregisterExecutor(
}
// Tell the containerizer to update the resources.
- containerizer->update(executor->containerId, executor->resources)
+ containerizer->update(
+ executor->containerId,
+ executor->allocatedResources())
.onAny(defer(self(),
&Self::_reregisterExecutor,
lambda::_1,
@@ -4623,7 +4601,7 @@ void Slave::_statusUpdate(
// have been updated before sending the status update. Note that
// duplicate terminal updates are not possible here because they
// lead to an error from `Executor::updateTaskState`.
- containerizer->update(executor->containerId, executor->resources)
+ containerizer->update(executor->containerId, executor->allocatedResources())
.onAny(defer(self(),
&Slave::__statusUpdate,
lambda::_1,
@@ -6445,9 +6423,8 @@ void Slave::_forwardOversubscribed(const Future<Resources>& oversubscribable)
// calculating the available oversubscribed resources to offer.
Resources oversubscribed;
foreachvalue (Framework* framework, frameworks) {
- foreachvalue (Executor* executor, framework->executors) {
- oversubscribed += unallocated(executor->resources.revocable());
- }
+ oversubscribed += unallocated(
+ framework->allocatedResources().revocable());
}
// Add oversubscribable resources to the total.
@@ -6647,7 +6624,7 @@ Future<ResourceUsage> Slave::usage()
ResourceUsage::Executor* entry = usage->add_executors();
entry->mutable_executor_info()->CopyFrom(executor->info);
- entry->mutable_allocated()->CopyFrom(executor->resources);
+ entry->mutable_allocated()->CopyFrom(executor->allocatedResources());
entry->mutable_container_id()->CopyFrom(executor->containerId);
// We include non-terminal tasks in ResourceUsage.
@@ -7012,9 +6989,7 @@ double Slave::_resources_used(const string& name)
Resources used;
foreachvalue (Framework* framework, frameworks) {
- foreachvalue (Executor* executor, framework->executors) {
- used += executor->resources.nonRevocable();
- }
+ used += framework->allocatedResources().nonRevocable();
}
return used.get<Value::Scalar>(name).getOrElse(Value::Scalar()).value();
@@ -7056,9 +7031,7 @@ double Slave::_resources_revocable_used(const string& name)
Resources used;
foreachvalue (Framework* framework, frameworks) {
- foreachvalue (Executor* executor, framework->executors) {
- used += executor->resources.revocable();
- }
+ used += framework->allocatedResources().revocable();
}
return used.get<Value::Scalar>(name).getOrElse(Value::Scalar()).value();
@@ -7484,9 +7457,7 @@ Resources Framework::allocatedResources() const
Resources allocated;
foreachvalue (const Executor* executor, executors) {
- // TODO(abudnik): Currently `Executor::resources` does not include
- // the executor's queued tasks!
- allocated += executor->resources;
+ allocated += executor->allocatedResources();
}
hashset<ExecutorID> pendingExecutors;
@@ -7530,7 +7501,6 @@ Executor::Executor(
checkpoint(_checkpoint),
http(None()),
pid(None()),
- resources(_info.resources()),
completedTasks(MAX_COMPLETED_TASKS_PER_EXECUTOR)
{
CHECK_NOTNULL(slave);
@@ -7579,8 +7549,6 @@ Task* Executor::addTask(const TaskInfo& task)
launchedTasks[task.task_id()] = t;
- resources += task.resources();
-
return t;
}
@@ -7660,12 +7628,6 @@ void Executor::recoverTask(const TaskState& state, bool recheckpointTask)
launchedTasks[state.id] = task;
- // NOTE: Since some tasks might have been terminated when the
- // slave was down, the executor resources we capture here is an
- // upper-bound. The actual resources needed (for live tasks) by
- // the isolator will be calculated when the executor re-registers.
- resources += state.info->resources();
-
// Read updates to get the latest state of the task.
foreach (const StatusUpdate& update, state.updates) {
Try<Nothing> updated = updateTaskState(update.status());
@@ -7754,7 +7716,6 @@ Try<Nothing> Executor::updateTaskState(const TaskStatus& status)
task = launchedTasks.at(status.task_id());
if (terminal) {
- resources -= task->resources(); // Release the resources.
launchedTasks.erase(taskId);
}
} else if (terminatedTasks.contains(taskId)) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/f10ee7e6/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index 094f464..1fe93da 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -795,9 +795,6 @@ public:
Option<HttpConnection> http;
Option<process::UPID> pid;
- // Currently consumed resources.
- Resources resources;
-
// Tasks can be found in one of the following four data structures:
//
// TODO(bmahler): Make these private to enforce that the task