You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2017/02/07 17:43:42 UTC

[3/5] mesos git commit: Made `kill` not use pipelining in the default executor.

Made `kill` not use pipelining in the default executor.

The previous code used to pipeline all the `KILL_NESTED_CONTAINER`
calls on the same connection. This change removes this and invokes
`post` for each child container. This simplifies the code and
makes it more readable.

Review: https://reviews.apache.org/r/56267/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2c26d577
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2c26d577
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2c26d577

Branch: refs/heads/master
Commit: 2c26d577203ecc0afc706d3484de80099512e1cb
Parents: 08f4cdd
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Feb 7 09:42:08 2017 -0800
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Feb 7 09:42:52 2017 -0800

----------------------------------------------------------------------
 src/launcher/default_executor.cpp | 48 +++++++++++-----------------------
 1 file changed, 15 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2c26d577/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index 97eee05..78a4b6e 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -717,42 +717,17 @@ protected:
 
     CHECK_EQ(SUBSCRIBED, state);
 
-    process::http::connect(agent)
-      .onAny(defer(self(), &Self::_shutdown, lambda::_1));
-  }
-
-  void _shutdown(const Future<Connection>& connection)
-  {
-    if (!connection.isReady()) {
-      LOG(ERROR)
-        << "Unable to establish connection with the agent: "
-        << (connection.isFailed() ? connection.failure() : "discarded");
-      __shutdown();
-      return;
-    }
-
-    // It is possible that the agent process failed before we could
-    // kill the child containers.
-    if (state == DISCONNECTED || state == CONNECTED) {
-      LOG(ERROR) << "Unable to kill child containers as the "
-                 << "executor is in state " << state;
-      __shutdown();
-      return;
-    }
-
     list<Future<Nothing>> killing;
     foreachkey (const ContainerID& containerId, containers) {
-      killing.push_back(kill(connection.get(), containerId));
+      killing.push_back(kill(containerId));
     }
 
     // It is possible that the agent process can fail while we are
-    // killing child containers. We fail fast if this happens. We
-    // capture `connection` to ensure that the connection is not
-    // disconnected before the responses are complete.
+    // killing child containers. We fail fast if this happens.
     collect(killing)
       .onAny(defer(
           self(),
-          [this, connection](const Future<list<Nothing>>& future) {
+          [this](const Future<list<Nothing>>& future) {
         if (future.isReady()) {
           return;
         }
@@ -779,7 +754,7 @@ protected:
     terminate(self());
   }
 
-  Future<Nothing> kill(Connection connection, const ContainerID& containerId)
+  Future<Nothing> kill(const ContainerID& containerId)
   {
     CHECK_EQ(SUBSCRIBED, state);
     CHECK(containers.contains(containerId));
@@ -794,7 +769,7 @@ protected:
 
     kill->mutable_container_id()->CopyFrom(containerId);
 
-    return post(connection, call)
+    return post(None(), call)
       .then([](const Response& /* response */) {
         return Nothing();
       });
@@ -906,17 +881,24 @@ private:
     mesos->send(evolve(call));
   }
 
-  Future<Response> post(Connection connection, const agent::Call& call)
+  Future<Response> post(
+      Option<Connection> connection,
+      const agent::Call& call)
   {
     ::Request request;
     request.method = "POST";
     request.url = agent;
     request.body = serialize(contentType, evolve(call));
-    request.keepAlive = true;
     request.headers = {{"Accept", stringify(contentType)},
                        {"Content-Type", stringify(contentType)}};
 
-    return connection.send(request);
+    // Only pipeline requests when there is an active connection.
+    if (connection.isSome()) {
+      request.keepAlive = true;
+    }
+
+    return connection.isSome() ? connection->send(request)
+                               : process::http::request(request);
   }
 
   void retry(