You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2017/02/07 17:43:41 UTC

[2/5] mesos git commit: Introduced a `Container` struct on the default executor.

Introduced a `Container` struct on the default executor.

This helps us in consolidating various metadata associatd with
an active child container. Also, this would be used in the future
for figuring out the task group corresponding to a child container
and then killing the other child containers belonging to the task
group when one of the tasks fail; or if a task in the task group
is explicitly killed by the scheduler.

Review: https://reviews.apache.org/r/56269/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/11fb9346
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/11fb9346
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/11fb9346

Branch: refs/heads/master
Commit: 11fb93466f192b551d711bd359f070fcd4dc5417
Parents: 2a45dc9
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Feb 7 09:42:17 2017 -0800
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Feb 7 09:42:52 2017 -0800

----------------------------------------------------------------------
 src/launcher/default_executor.cpp | 156 ++++++++++++++++-----------------
 1 file changed, 77 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/11fb9346/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index 57d6349..327510e 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -79,6 +79,21 @@ namespace internal {
 
 class DefaultExecutor : public ProtobufProcess<DefaultExecutor>
 {
+private:
+  // Represents a child container. This is defined here since
+  // C++ does not allow forward declaring nested classes.
+  struct Container
+  {
+    ContainerID containerId;
+    TaskID taskId;
+    TaskGroupInfo taskGroup; // Task group of the child container.
+
+    // Connection used for waiting on the child container. It is possible
+    // that a container is active but a connection for sending the
+    // `WAIT_NESTED_CONTAINER` call has not been established yet.
+    Option<Connection> waiting;
+  };
+
 public:
   DefaultExecutor(
       const FrameworkID& _frameworkId,
@@ -117,13 +132,14 @@ public:
     state = DISCONNECTED;
     connectionId = None();
 
-    // Disconnect all active connections used for waiting on child
-    // containers.
-    foreachvalue (Connection connection, waiting) {
-      connection.disconnect();
+    // Disconnect all active connections used for
+    // waiting on child containers.
+    foreachvalue (Owned<Container> container, containers) {
+      if (container->waiting.isSome()) {
+        container->waiting->disconnect();
+        container->waiting = None();
+      }
     }
-
-    waiting.clear();
   }
 
   void received(const Event& event)
@@ -299,7 +315,7 @@ protected:
     CHECK_EQ(SUBSCRIBED, state);
     CHECK_SOME(executorContainerId);
 
-    list<ContainerID> pending;
+    list<ContainerID> containerIds;
     list<Future<Response>> responses;
 
     foreach (const TaskInfo& task, taskGroup.tasks()) {
@@ -307,7 +323,7 @@ protected:
       containerId.set_value(UUID::random().toString());
       containerId.mutable_parent()->CopyFrom(executorContainerId.get());
 
-      pending.push_back(containerId);
+      containerIds.push_back(containerId);
 
       agent::Call call;
       call.set_type(agent::Call::LAUNCH_NESTED_CONTAINER);
@@ -332,14 +348,14 @@ protected:
       .onAny(defer(self(),
                    &Self::__launchGroup,
                    taskGroup,
-                   pending,
+                   containerIds,
                    connection.get(),
                    lambda::_1));
   }
 
   void __launchGroup(
       const TaskGroupInfo& taskGroup,
-      list<ContainerID> pending,
+      const list<ContainerID>& containerIds,
       const Connection& connection,
       const Future<list<Response>>& responses)
   {
@@ -383,15 +399,16 @@ protected:
 
     CHECK_EQ(SUBSCRIBED, state);
     CHECK(launched);
+    CHECK_EQ(containerIds.size(), (size_t) taskGroup.tasks().size());
 
-    foreach (const TaskInfo& task, taskGroup.tasks()) {
+    size_t index = 0;
+    foreach (const ContainerID& containerId, containerIds) {
+      const TaskInfo& task = taskGroup.tasks().Get(index++);
       const TaskID& taskId = task.task_id();
-      ContainerID containerId = pending.front();
 
       tasks[taskId] = task;
-      containers[taskId] = containerId;
-
-      pending.pop_front();
+      containers[taskId] = Owned<Container>(
+          new Container {containerId, taskId, taskGroup, None()});
 
       if (task.has_health_check()) {
         // TODO(anand): Add support for command health checks.
@@ -449,10 +466,11 @@ protected:
       update(task.task_id(), TASK_RUNNING);
     }
 
+
     LOG(INFO)
       << "Successfully launched tasks "
       << stringify(containers.keys()) << " in child containers "
-      << stringify(containers.values());
+      << stringify(containerIds);
 
     wait();
   }
@@ -496,16 +514,10 @@ protected:
     CHECK_SOME(connectionId);
 
     list<Connection> connections = _connections.get();
-    CHECK_EQ(containers.size(), connections.size());
 
+    CHECK_EQ(containers.size(), connections.size());
     foreachkey (const TaskID& taskId, containers) {
-      CHECK(!waiting.contains(taskId));
-
-      __wait(connectionId.get(),
-             connections.front(),
-             taskId,
-             containers[taskId]);
-
+      __wait(connectionId.get(), connections.front(), taskId);
       connections.pop_front();
     }
   }
@@ -513,8 +525,7 @@ protected:
   void __wait(
       const UUID& _connectionId,
       const Connection& connection,
-      const TaskID& taskId,
-      const ContainerID& containerId)
+      const TaskID& taskId)
   {
     if (connectionId != _connectionId) {
       VLOG(1) << "Ignoring the wait operation from a stale connection";
@@ -523,12 +534,15 @@ protected:
 
     CHECK_EQ(SUBSCRIBED, state);
     CHECK_SOME(connectionId);
-    CHECK(!waiting.contains(taskId));
+    CHECK(containers.contains(taskId));
+
+    Owned<Container> container = containers.at(taskId);
 
-    LOG(INFO) << "Waiting for child container " << containerId
+    LOG(INFO) << "Waiting for child container " << container->containerId
               << " of task '" << taskId << "'";
 
-    waiting.put(taskId, connection);
+    CHECK_NONE(container->waiting);
+    container->waiting = connection;
 
     agent::Call call;
     call.set_type(agent::Call::WAIT_NESTED_CONTAINER);
@@ -536,24 +550,20 @@ protected:
     agent::Call::WaitNestedContainer* containerWait =
       call.mutable_wait_nested_container();
 
-    containerWait->mutable_container_id()->CopyFrom(containerId);
+    containerWait->mutable_container_id()->CopyFrom(container->containerId);
 
     Future<Response> response = post(connection, call);
     response
       .onAny(defer(self(),
                    &Self::waited,
                    connectionId.get(),
-                   connection,
                    taskId,
-                   containerId,
                    lambda::_1));
   }
 
   void waited(
       const UUID& _connectionId,
-      Connection connection,
       const TaskID& taskId,
-      const ContainerID& containerId,
       const Future<Response>& response)
   {
     // It is possible that this callback executed after the agent process
@@ -565,13 +575,16 @@ protected:
     }
 
     CHECK_EQ(SUBSCRIBED, state);
-    CHECK(waiting.contains(taskId));
-    CHECK(waiting.get(taskId) == connection);
+    CHECK(containers.contains(taskId));
+
+    Owned<Container> container = containers.at(taskId);
 
-    auto retry_ = [this, connection, taskId, containerId]() mutable {
-      connection.disconnect();
-      waiting.erase(taskId);
-      retry(connectionId.get(), taskId, containerId);
+    CHECK_SOME(container->waiting);
+
+    auto retry_ = [this, container]() mutable {
+      container->waiting->disconnect();
+      container->waiting = None();
+      retry(connectionId.get(), container->taskId);
     };
 
     // It is possible that the response failed due to a network blip
@@ -580,7 +593,7 @@ protected:
     if (!response.isReady()) {
       LOG(ERROR)
         << "Connection for waiting on child container "
-        << containerId << " of task '" << taskId << "' interrupted: "
+        << container->containerId << " of task '" << taskId << "' interrupted: "
         << (response.isFailed() ? response.failure() : "discarded");
       retry_();
       return;
@@ -593,7 +606,7 @@ protected:
     if (response->code == process::http::Status::SERVICE_UNAVAILABLE) {
       LOG(WARNING) << "Received '" << response->status << "' ("
                    << response->body << ") waiting on child container "
-                   << containerId << " of task '" << taskId << "'";
+                   << container->containerId << " of task '" << taskId << "'";
       retry_();
       return;
     }
@@ -603,7 +616,7 @@ protected:
     if (response->code != process::http::Status::OK) {
       LOG(ERROR) << "Received '" << response->status << "' ("
                  << response->body << ") waiting on child container "
-                 << containerId << " of task '" << taskId << "'";
+                 << container->containerId << " of task '" << taskId << "'";
       __shutdown();
       return;
     }
@@ -657,7 +670,7 @@ protected:
     containers.erase(taskId);
 
     LOG(INFO)
-      << "Child container " << containerId << " of task '" << taskId
+      << "Child container " << container->containerId << " of task '" << taskId
       << "' in state " << stringify(taskState) << " terminated with status "
       << (status.isSome() ? WSTRINGIFY(status.get()) : "unknown");
 
@@ -759,9 +772,9 @@ protected:
     CHECK_EQ(SUBSCRIBED, state);
     CHECK(containers.contains(taskId));
 
-    const ContainerID& containerId = containers.at(taskId);
+    const Owned<Container>& container = containers.at(taskId);
 
-    LOG(INFO) << "Killing child container " << containerId;
+    LOG(INFO) << "Killing child container " << container->containerId;
 
     agent::Call call;
     call.set_type(agent::Call::KILL_NESTED_CONTAINER);
@@ -769,7 +782,7 @@ protected:
     agent::Call::KillNestedContainer* kill =
       call.mutable_kill_nested_container();
 
-    kill->mutable_container_id()->CopyFrom(containerId);
+    kill->mutable_container_id()->CopyFrom(container->containerId);
 
     return post(None(), call)
       .then([](const Response& /* response */) {
@@ -852,10 +865,10 @@ private:
     // possible that we don't have an active container associated with
     // the task (e.g., when the scheduler tried to launch multiple task groups).
     if (containers.contains(taskId)) {
-      const ContainerID& containerId = containers.at(taskId);
+      const Owned<Container>& container = containers.at(taskId);
 
       ContainerStatus* containerStatus = status.mutable_container_status();
-      containerStatus->mutable_container_id()->CopyFrom(containerId);
+      containerStatus->mutable_container_id()->CopyFrom(container->containerId);
     }
 
     Call call;
@@ -892,10 +905,7 @@ private:
                                : process::http::request(request);
   }
 
-  void retry(
-      const UUID& _connectionId,
-      const TaskID& taskId,
-      const ContainerID& containerId)
+  void retry(const UUID& _connectionId, const TaskID& taskId)
   {
     if (connectionId != _connectionId) {
       VLOG(1) << "Ignoring retry attempt from a stale connection";
@@ -909,15 +919,13 @@ private:
                    &Self::_retry,
                    lambda::_1,
                    connectionId.get(),
-                   taskId,
-                   containerId));
+                   taskId));
   }
 
   void _retry(
       const Future<Connection>& connection,
       const UUID& _connectionId,
-      const TaskID& taskId,
-      const ContainerID& containerId)
+      const TaskID& taskId)
   {
     const Duration duration = Seconds(1);
 
@@ -928,29 +936,27 @@ private:
 
     CHECK_EQ(SUBSCRIBED, state);
     CHECK_SOME(connectionId);
+    CHECK(containers.contains(taskId));
+
+    const Owned<Container>& container = containers.at(taskId);
 
     if (!connection.isReady()) {
       LOG(ERROR)
         << "Unable to establish connection with the agent ("
         << (connection.isFailed() ? connection.failure() : "discarded")
-        << ") for waiting on child container " << containerId
+        << ") for waiting on child container " << container->containerId
         << " of task '" << taskId << "'; Retrying again in " << duration;
 
       process::delay(
-          duration,
-          self(),
-          &Self::retry,
-          connectionId.get(),
-          taskId,
-          containerId);
+          duration, self(), &Self::retry, connectionId.get(), taskId);
 
       return;
     }
 
     LOG(INFO)
-      << "Established connection to wait for child container " << containerId
-      << " of task '" << taskId << "'; Retrying the WAIT_NESTED_CONTAINER call "
-      << "in " << duration;
+      << "Established connection to wait for child container "
+      << container->containerId << " of task '" << taskId
+      << "'; Retrying the WAIT_NESTED_CONTAINER call " << "in " << duration;
 
     // It is possible that we were able to reestablish the connection
     // but the agent might still be recovering. To avoid the vicious
@@ -963,8 +969,7 @@ private:
         &Self::__wait,
         connectionId.get(),
         connection.get(),
-        taskId,
-        containerId);
+        taskId);
   }
 
   enum State
@@ -989,16 +994,8 @@ private:
   LinkedHashMap<UUID, Call::Update> updates; // Unacknowledged updates.
   LinkedHashMap<TaskID, TaskInfo> tasks; // Unacknowledged tasks.
 
-  // TODO(anand): Consider creating a `Container` struct to manage
-  // information about an active container and its waiting connection.
-
-  LinkedHashMap<TaskID, ContainerID> containers; // Active child containers.
-
-  // Connections used for waiting on child containers. A child container
-  // can be active and present in `containers` but not present
-  // in `waiting` if a connection for sending the `WAIT_NESTED_CONTAINER`
-  // call has not been established yet.
-  hashmap<TaskID, Connection> waiting;
+  // Active child containers.
+  LinkedHashMap<TaskID, Owned<Container>> containers;
 
   // There can be multiple simulataneous ongoing (re-)connection attempts
   // with the agent for waiting on child containers. This helps us in
@@ -1007,6 +1004,7 @@ private:
   // a `connected()` callback.
   Option<UUID> connectionId;
 
+  // TODO(anand): Move the health checker information to the `Container` struct.
   hashmap<TaskID, Owned<checks::HealthChecker>> checkers; // Health checkers.
 };