You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2017/02/07 17:43:44 UTC

[5/5] mesos git commit: Modified active containers key to be `TaskID` in the default executor.

Modified active containers key to be `TaskID` in the default executor.

We have more occurences of look up by `TaskID` then the other way
around. Hence, modified the active containers hashmap to be keyed
on `TaskID` instead.

Review: https://reviews.apache.org/r/56308/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2a45dc9e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2a45dc9e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2a45dc9e

Branch: refs/heads/master
Commit: 2a45dc9effd664056d616bcdc843d3a4c1c26548
Parents: 2c26d57
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Feb 7 09:42:14 2017 -0800
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Feb 7 09:42:52 2017 -0800

----------------------------------------------------------------------
 src/launcher/default_executor.cpp | 71 +++++++++++++++-------------------
 1 file changed, 31 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2a45dc9e/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index 78a4b6e..57d6349 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -389,7 +389,7 @@ protected:
       ContainerID containerId = pending.front();
 
       tasks[taskId] = task;
-      containers[containerId] = taskId;
+      containers[taskId] = containerId;
 
       pending.pop_front();
 
@@ -450,8 +450,8 @@ protected:
     }
 
     LOG(INFO)
-      << "Successfully launched child containers "
-      << stringify(containers.keys()) << " for tasks "
+      << "Successfully launched tasks "
+      << stringify(containers.keys()) << " in child containers "
       << stringify(containers.values());
 
     wait();
@@ -498,13 +498,13 @@ protected:
     list<Connection> connections = _connections.get();
     CHECK_EQ(containers.size(), connections.size());
 
-    foreachkey (const ContainerID& containerId, containers) {
-      CHECK(!waiting.contains(containerId));
+    foreachkey (const TaskID& taskId, containers) {
+      CHECK(!waiting.contains(taskId));
 
       __wait(connectionId.get(),
              connections.front(),
-             containers[containerId],
-             containerId);
+             taskId,
+             containers[taskId]);
 
       connections.pop_front();
     }
@@ -523,12 +523,12 @@ protected:
 
     CHECK_EQ(SUBSCRIBED, state);
     CHECK_SOME(connectionId);
-    CHECK(!waiting.contains(containerId));
+    CHECK(!waiting.contains(taskId));
 
     LOG(INFO) << "Waiting for child container " << containerId
               << " of task '" << taskId << "'";
 
-    waiting.put(containerId, connection);
+    waiting.put(taskId, connection);
 
     agent::Call call;
     call.set_type(agent::Call::WAIT_NESTED_CONTAINER);
@@ -565,12 +565,12 @@ protected:
     }
 
     CHECK_EQ(SUBSCRIBED, state);
-    CHECK(waiting.contains(containerId));
-    CHECK(waiting.get(containerId) == connection);
+    CHECK(waiting.contains(taskId));
+    CHECK(waiting.get(taskId) == connection);
 
     auto retry_ = [this, connection, taskId, containerId]() mutable {
       connection.disconnect();
-      waiting.erase(containerId);
+      waiting.erase(taskId);
       retry(connectionId.get(), taskId, containerId);
     };
 
@@ -653,8 +653,8 @@ protected:
       update(taskId, taskState, message, None());
     }
 
-    CHECK(containers.contains(containerId));
-    containers.erase(containerId);
+    CHECK(containers.contains(taskId));
+    containers.erase(taskId);
 
     LOG(INFO)
       << "Child container " << containerId << " of task '" << taskId
@@ -718,8 +718,8 @@ protected:
     CHECK_EQ(SUBSCRIBED, state);
 
     list<Future<Nothing>> killing;
-    foreachkey (const ContainerID& containerId, containers) {
-      killing.push_back(kill(containerId));
+    foreachkey (const TaskID& taskId, containers) {
+      killing.push_back(kill(taskId));
     }
 
     // It is possible that the agent process can fail while we are
@@ -754,10 +754,12 @@ protected:
     terminate(self());
   }
 
-  Future<Nothing> kill(const ContainerID& containerId)
+  Future<Nothing> kill(const TaskID& taskId)
   {
     CHECK_EQ(SUBSCRIBED, state);
-    CHECK(containers.contains(containerId));
+    CHECK(containers.contains(taskId));
+
+    const ContainerID& containerId = containers.at(taskId);
 
     LOG(INFO) << "Killing child container " << containerId;
 
@@ -789,15 +791,7 @@ protected:
 
     LOG(INFO) << "Received kill for task '" << taskId << "'";
 
-    bool found = false;
-    foreachvalue (const TaskID& taskId_, containers) {
-      if (taskId_ == taskId) {
-        found = true;
-        break;
-      }
-    }
-
-    if (!found) {
+    if (!containers.contains(taskId)) {
       LOG(WARNING) << "Ignoring kill for task '" << taskId
                    << "' as it is no longer active";
       return;
@@ -854,17 +848,14 @@ private:
       status.set_healthy(healthy.get());
     }
 
-    // Fill the container ID associated with this task.
-    // TODO(jieyu): Consider maintain a hashmap between TaskID to
-    // ContainerID so that we don't have to loop through all tasks.
-    foreachpair (const ContainerID& containerId,
-                 const TaskID& containerTaskId,
-                 containers) {
-      if (containerTaskId == taskId) {
-        ContainerStatus* containerStatus = status.mutable_container_status();
-        containerStatus->mutable_container_id()->CopyFrom(containerId);
-        break;
-      }
+    // Fill the container ID associated with this task. It might be
+    // possible that we don't have an active container associated with
+    // the task (e.g., when the scheduler tried to launch multiple task groups).
+    if (containers.contains(taskId)) {
+      const ContainerID& containerId = containers.at(taskId);
+
+      ContainerStatus* containerStatus = status.mutable_container_status();
+      containerStatus->mutable_container_id()->CopyFrom(containerId);
     }
 
     Call call;
@@ -1001,13 +992,13 @@ private:
   // TODO(anand): Consider creating a `Container` struct to manage
   // information about an active container and its waiting connection.
 
-  LinkedHashMap<ContainerID, TaskID> containers; // Active child containers.
+  LinkedHashMap<TaskID, ContainerID> containers; // Active child containers.
 
   // Connections used for waiting on child containers. A child container
   // can be active and present in `containers` but not present
   // in `waiting` if a connection for sending the `WAIT_NESTED_CONTAINER`
   // call has not been established yet.
-  hashmap<ContainerID, Connection> waiting;
+  hashmap<TaskID, Connection> waiting;
 
   // There can be multiple simulataneous ongoing (re-)connection attempts
   // with the agent for waiting on child containers. This helps us in