You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by an...@apache.org on 2017/02/07 17:43:42 UTC
[3/5] mesos git commit: Made `kill` not use pipelining in the default
executor.
Made `kill` not use pipelining in the default executor.
The previous code used to pipeline all the `KILL_NESTED_CONTAINER`
calls on the same connection. This change removes this and invokes
`post` for each child container. This simplifies the code and
makes it more readable.
Review: https://reviews.apache.org/r/56267/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2c26d577
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2c26d577
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2c26d577
Branch: refs/heads/master
Commit: 2c26d577203ecc0afc706d3484de80099512e1cb
Parents: 08f4cdd
Author: Anand Mazumdar <an...@apache.org>
Authored: Tue Feb 7 09:42:08 2017 -0800
Committer: Anand Mazumdar <an...@apache.org>
Committed: Tue Feb 7 09:42:52 2017 -0800
----------------------------------------------------------------------
src/launcher/default_executor.cpp | 48 +++++++++++-----------------------
1 file changed, 15 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2c26d577/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index 97eee05..78a4b6e 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -717,42 +717,17 @@ protected:
CHECK_EQ(SUBSCRIBED, state);
- process::http::connect(agent)
- .onAny(defer(self(), &Self::_shutdown, lambda::_1));
- }
-
- void _shutdown(const Future<Connection>& connection)
- {
- if (!connection.isReady()) {
- LOG(ERROR)
- << "Unable to establish connection with the agent: "
- << (connection.isFailed() ? connection.failure() : "discarded");
- __shutdown();
- return;
- }
-
- // It is possible that the agent process failed before we could
- // kill the child containers.
- if (state == DISCONNECTED || state == CONNECTED) {
- LOG(ERROR) << "Unable to kill child containers as the "
- << "executor is in state " << state;
- __shutdown();
- return;
- }
-
list<Future<Nothing>> killing;
foreachkey (const ContainerID& containerId, containers) {
- killing.push_back(kill(connection.get(), containerId));
+ killing.push_back(kill(containerId));
}
// It is possible that the agent process can fail while we are
- // killing child containers. We fail fast if this happens. We
- // capture `connection` to ensure that the connection is not
- // disconnected before the responses are complete.
+ // killing child containers. We fail fast if this happens.
collect(killing)
.onAny(defer(
self(),
- [this, connection](const Future<list<Nothing>>& future) {
+ [this](const Future<list<Nothing>>& future) {
if (future.isReady()) {
return;
}
@@ -779,7 +754,7 @@ protected:
terminate(self());
}
- Future<Nothing> kill(Connection connection, const ContainerID& containerId)
+ Future<Nothing> kill(const ContainerID& containerId)
{
CHECK_EQ(SUBSCRIBED, state);
CHECK(containers.contains(containerId));
@@ -794,7 +769,7 @@ protected:
kill->mutable_container_id()->CopyFrom(containerId);
- return post(connection, call)
+ return post(None(), call)
.then([](const Response& /* response */) {
return Nothing();
});
@@ -906,17 +881,24 @@ private:
mesos->send(evolve(call));
}
- Future<Response> post(Connection connection, const agent::Call& call)
+ Future<Response> post(
+ Option<Connection> connection,
+ const agent::Call& call)
{
::Request request;
request.method = "POST";
request.url = agent;
request.body = serialize(contentType, evolve(call));
- request.keepAlive = true;
request.headers = {{"Accept", stringify(contentType)},
{"Content-Type", stringify(contentType)}};
- return connection.send(request);
+ // Only pipeline requests when there is an active connection.
+ if (connection.isSome()) {
+ request.keepAlive = true;
+ }
+
+ return connection.isSome() ? connection->send(request)
+ : process::http::request(request);
}
void retry(