You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2017/02/07 17:43:44 UTC
[5/5] mesos git commit: Modified active containers key to be `TaskID`
in the default executor.
Modified active containers key to be `TaskID` in the default executor.
We have more occurences of look up by `TaskID` then the other way
around. Hence, modified the active containers hashmap to be keyed
on `TaskID` instead.
Review: https://reviews.apache.org/r/56308/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2a45dc9e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2a45dc9e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2a45dc9e
Branch: refs/heads/master
Commit: 2a45dc9effd664056d616bcdc843d3a4c1c26548
Parents: 2c26d57
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Feb 7 09:42:14 2017 -0800
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Feb 7 09:42:52 2017 -0800
----------------------------------------------------------------------
src/launcher/default_executor.cpp | 71 +++++++++++++++-------------------
1 file changed, 31 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2a45dc9e/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index 78a4b6e..57d6349 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -389,7 +389,7 @@ protected:
ContainerID containerId = pending.front();
tasks[taskId] = task;
- containers[containerId] = taskId;
+ containers[taskId] = containerId;
pending.pop_front();
@@ -450,8 +450,8 @@ protected:
}
LOG(INFO)
- << "Successfully launched child containers "
- << stringify(containers.keys()) << " for tasks "
+ << "Successfully launched tasks "
+ << stringify(containers.keys()) << " in child containers "
<< stringify(containers.values());
wait();
@@ -498,13 +498,13 @@ protected:
list<Connection> connections = _connections.get();
CHECK_EQ(containers.size(), connections.size());
- foreachkey (const ContainerID& containerId, containers) {
- CHECK(!waiting.contains(containerId));
+ foreachkey (const TaskID& taskId, containers) {
+ CHECK(!waiting.contains(taskId));
__wait(connectionId.get(),
connections.front(),
- containers[containerId],
- containerId);
+ taskId,
+ containers[taskId]);
connections.pop_front();
}
@@ -523,12 +523,12 @@ protected:
CHECK_EQ(SUBSCRIBED, state);
CHECK_SOME(connectionId);
- CHECK(!waiting.contains(containerId));
+ CHECK(!waiting.contains(taskId));
LOG(INFO) << "Waiting for child container " << containerId
<< " of task '" << taskId << "'";
- waiting.put(containerId, connection);
+ waiting.put(taskId, connection);
agent::Call call;
call.set_type(agent::Call::WAIT_NESTED_CONTAINER);
@@ -565,12 +565,12 @@ protected:
}
CHECK_EQ(SUBSCRIBED, state);
- CHECK(waiting.contains(containerId));
- CHECK(waiting.get(containerId) == connection);
+ CHECK(waiting.contains(taskId));
+ CHECK(waiting.get(taskId) == connection);
auto retry_ = [this, connection, taskId, containerId]() mutable {
connection.disconnect();
- waiting.erase(containerId);
+ waiting.erase(taskId);
retry(connectionId.get(), taskId, containerId);
};
@@ -653,8 +653,8 @@ protected:
update(taskId, taskState, message, None());
}
- CHECK(containers.contains(containerId));
- containers.erase(containerId);
+ CHECK(containers.contains(taskId));
+ containers.erase(taskId);
LOG(INFO)
<< "Child container " << containerId << " of task '" << taskId
@@ -718,8 +718,8 @@ protected:
CHECK_EQ(SUBSCRIBED, state);
list<Future<Nothing>> killing;
- foreachkey (const ContainerID& containerId, containers) {
- killing.push_back(kill(containerId));
+ foreachkey (const TaskID& taskId, containers) {
+ killing.push_back(kill(taskId));
}
// It is possible that the agent process can fail while we are
@@ -754,10 +754,12 @@ protected:
terminate(self());
}
- Future<Nothing> kill(const ContainerID& containerId)
+ Future<Nothing> kill(const TaskID& taskId)
{
CHECK_EQ(SUBSCRIBED, state);
- CHECK(containers.contains(containerId));
+ CHECK(containers.contains(taskId));
+
+ const ContainerID& containerId = containers.at(taskId);
LOG(INFO) << "Killing child container " << containerId;
@@ -789,15 +791,7 @@ protected:
LOG(INFO) << "Received kill for task '" << taskId << "'";
- bool found = false;
- foreachvalue (const TaskID& taskId_, containers) {
- if (taskId_ == taskId) {
- found = true;
- break;
- }
- }
-
- if (!found) {
+ if (!containers.contains(taskId)) {
LOG(WARNING) << "Ignoring kill for task '" << taskId
<< "' as it is no longer active";
return;
@@ -854,17 +848,14 @@ private:
status.set_healthy(healthy.get());
}
- // Fill the container ID associated with this task.
- // TODO(jieyu): Consider maintain a hashmap between TaskID to
- // ContainerID so that we don't have to loop through all tasks.
- foreachpair (const ContainerID& containerId,
- const TaskID& containerTaskId,
- containers) {
- if (containerTaskId == taskId) {
- ContainerStatus* containerStatus = status.mutable_container_status();
- containerStatus->mutable_container_id()->CopyFrom(containerId);
- break;
- }
+ // Fill the container ID associated with this task. It might be
+ // possible that we don't have an active container associated with
+ // the task (e.g., when the scheduler tried to launch multiple task groups).
+ if (containers.contains(taskId)) {
+ const ContainerID& containerId = containers.at(taskId);
+
+ ContainerStatus* containerStatus = status.mutable_container_status();
+ containerStatus->mutable_container_id()->CopyFrom(containerId);
}
Call call;
@@ -1001,13 +992,13 @@ private:
// TODO(anand): Consider creating a `Container` struct to manage
// information about an active container and its waiting connection.
- LinkedHashMap<ContainerID, TaskID> containers; // Active child containers.
+ LinkedHashMap<TaskID, ContainerID> containers; // Active child containers.
// Connections used for waiting on child containers. A child container
// can be active and present in `containers` but not present
// in `waiting` if a connection for sending the `WAIT_NESTED_CONTAINER`
// call has not been established yet.
- hashmap<ContainerID, Connection> waiting;
+ hashmap<TaskID, Connection> waiting;
// There can be multiple simulataneous ongoing (re-)connection attempts
// with the agent for waiting on child containers. This helps us in