You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/03/24 04:36:11 UTC
[5/6] mesos git commit: Made COMMAND health checks resilient to agent
failovers.
Made COMMAND health checks resilient to agent failovers.
Review: https://reviews.apache.org/r/57646/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0c0fbc57
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0c0fbc57
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0c0fbc57
Branch: refs/heads/master
Commit: 0c0fbc57bed2ab26dff516491c6264f37d14cd4f
Parents: 85edc8f
Author: Gast�n Kleiman <ga...@mesosphere.io>
Authored: Fri Mar 24 00:50:04 2017 +0100
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Fri Mar 24 05:29:12 2017 +0100
----------------------------------------------------------------------
src/checks/health_checker.cpp | 228 +++++++++++++++++++++++--------------
src/checks/health_checker.hpp | 16 ++-
2 files changed, 155 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0c0fbc57/src/checks/health_checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/health_checker.cpp b/src/checks/health_checker.cpp
index 3290eb6..1c098d1 100644
--- a/src/checks/health_checker.cpp
+++ b/src/checks/health_checker.cpp
@@ -34,6 +34,7 @@
#include <process/collect.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
+#include <process/future.hpp>
#include <process/http.hpp>
#include <process/io.hpp>
#include <process/subprocess.hpp>
@@ -68,6 +69,7 @@ using process::Clock;
using process::Failure;
using process::Future;
using process::Owned;
+using process::Promise;
using process::Subprocess;
using process::Time;
@@ -75,6 +77,7 @@ using process::http::Connection;
using process::http::Response;
using std::map;
+using std::shared_ptr;
using std::string;
using std::tuple;
using std::vector;
@@ -377,6 +380,13 @@ void HealthCheckerProcess::processCheckResult(
return;
}
+ if (future.isDiscarded()) {
+ LOG(INFO) << HealthCheck::Type_Name(check.type()) +
+ " health check of task " + stringify(taskId) + " discarded";
+ scheduleNext(checkInterval);
+ return;
+ }
+
VLOG(1) << "Performed " << HealthCheck::Type_Name(check.type())
<< " health check in " << stopwatch.elapsed();
@@ -386,8 +396,7 @@ void HealthCheckerProcess::processCheckResult(
}
string message = HealthCheck::Type_Name(check.type()) +
- " health check failed: " +
- (future.isFailed() ? future.failure() : "discarded");
+ " health check failed: " + future.failure();
failure(message);
}
@@ -489,16 +498,37 @@ Future<Nothing> HealthCheckerProcess::nestedCommandHealthCheck()
VLOG(1) << "Launching command health check of task " << stringify(taskId);
- return process::http::connect(agentURL.get())
- .repair([](const Future<Connection>& future) {
- return Failure(
- "Unable to establish connection with the agent: " + future.failure());
- })
- .then(defer(self(), &Self::_nestedCommandHealthCheck, lambda::_1));
+ // We don't want recoverable errors, e.g., the agent responding with
+ // HTTP status code 503, to trigger a health check failure.
+ //
+ // The future returned by this method represents the result of a
+ // health check. It will be set to `Nothing` if the check succeeded,
+ // to a `Failure` if it failed, and discarded if there was a transient
+ // error.
+ auto promise = std::make_shared<Promise<Nothing>>();
+
+ // TODO(alexr): Use lambda a named capture for
+ // this cached value once it is available.
+ const TaskID _taskId = taskId;
+
+ process::http::connect(agentURL.get())
+ .onFailed(defer(self(), [_taskId, promise](const string& failure) {
+ LOG(WARNING) << "Unable to establish connection with the agent to launch"
+ << " COMMAND health check for task '" << _taskId
+ << "': " << failure;
+
+ // We treat this as a transient failure.
+ promise->discard();
+ }))
+ .onReady(defer(self(),
+ &Self::_nestedCommandHealthCheck, promise, lambda::_1));
+
+ return promise->future();
}
-Future<Nothing> HealthCheckerProcess::_nestedCommandHealthCheck(
+void HealthCheckerProcess::_nestedCommandHealthCheck(
+ shared_ptr<process::Promise<Nothing>> promise,
Connection connection)
{
// TODO(gkleiman): Don't reuse the `ContainerID`, it is not safe.
@@ -525,6 +555,12 @@ Future<Nothing> HealthCheckerProcess::_nestedCommandHealthCheck(
{"Message-Accept", stringify(ContentType::PROTOBUF)},
{"Content-Type", stringify(ContentType::PROTOBUF)}};
+ // TODO(alexr): Use lambda named captures for
+ // these cached values once they are available.
+ const Duration timeout = checkTimeout;
+
+ auto checkTimedOut = std::make_shared<bool>(false);
+
// `LAUNCH_NESTED_CONTAINER_SESSION` returns a streamed response with
// the output of the container. The agent will close the stream once
// the container has exited, or kill the container if the client
@@ -537,100 +573,123 @@ Future<Nothing> HealthCheckerProcess::_nestedCommandHealthCheck(
// This means that this future will not be completed until after the
// health check command has finished or the connection has been
// closed.
- return connection.send(request, false)
+ connection.send(request, false)
.after(checkTimeout,
- defer(self(),
- &Self::nestedCommandHealthCheckTimedOut,
- checkContainerId,
- connection,
- lambda::_1))
- .then(defer(self(),
- &Self::__nestedCommandHealthCheck,
- checkContainerId,
- lambda::_1));
+ defer(self(), [timeout, checkTimedOut](Future<Response> future) {
+ future.discard();
+
+ *checkTimedOut = true;
+
+ return Failure(
+ "Command timed out after " + stringify(timeout) + "; aborting");
+ }))
+ .onFailed(defer(self(),
+ &Self::nestedCommandHealthCheckFailure,
+ promise,
+ connection,
+ checkContainerId,
+ checkTimedOut,
+ lambda::_1))
+ .onReady(defer(self(),
+ &Self::__nestedCommandHealthCheck,
+ promise,
+ checkContainerId,
+ lambda::_1));
}
-Future<Nothing> HealthCheckerProcess::__nestedCommandHealthCheck(
+void HealthCheckerProcess::__nestedCommandHealthCheck(
+ shared_ptr<process::Promise<Nothing>> promise,
const ContainerID& checkContainerId,
const Response& launchResponse)
{
if (launchResponse.code != process::http::Status::OK) {
- return Failure(
- "Received '" + launchResponse.status + "' (" + launchResponse.body +
- ") while launching a command health check of task '" +
- stringify(taskId) + "'");
+ // The agent was unable to launch the health check container, we
+ // treat this as a transient failure.
+ LOG(WARNING) << "Received '" << launchResponse.status << "' ("
+ << launchResponse.body << ") while launching command health "
+ << "check of task " << stringify(taskId);
+
+ promise->discard();
+ return;
}
// We need to make a copy so that the lambdas can capture it.
- const TaskID taskId_ = taskId;
+ const TaskID _taskId = taskId;
- return waitNestedContainer(checkContainerId)
- .repair([taskId_](const Future<Option<int>>& future) {
- return Failure(
- "Unable to get the exit code of command health check of task '" +
- stringify(taskId_) + "': " + future.failure());
+ waitNestedContainer(checkContainerId)
+ .onFailed([_taskId, promise](const string& failure) {
+ promise->fail(
+ "Unable to get the exit code of command health check of task " +
+ stringify(_taskId) + ": " + failure);
})
- .then([taskId_](const Option<int> status) -> Future<Nothing> {
+ .onReady([_taskId, promise](const Option<int>& status) -> void {
if (status.isNone()) {
- return Failure(
- "Unable to get the exit code of command health check of task '" +
- stringify(taskId_) + "'");
+ promise->fail(
+ "Unable to get the exit code of command health check of task " +
+ stringify(_taskId));
+ // TODO(gkleiman): Make sure that the following block works on Windows.
+ } else if (WIFSIGNALED(status.get()) &&
+ WTERMSIG(status.get()) == SIGKILL) {
+ // The check container was signaled, probably because the task
+ // finished while the check was still in-flight, so we discard
+ // the result.
+ promise->discard();
} else if (status.get() != 0) {
- return Failure(
- "Command health check of task '" + stringify(taskId_) +
- "' returned " + WSTRINGIFY(status.get()));
+ promise->fail(
+ "Command health check of task " + stringify(_taskId) +
+ " returned " + WSTRINGIFY(status.get()));
} else {
- return Nothing();
+ promise->set(Nothing());
}
});
}
-Future<Response>
-HealthCheckerProcess::nestedCommandHealthCheckTimedOut(
- const ContainerID& checkContainerId,
+void HealthCheckerProcess::nestedCommandHealthCheckFailure(
+ shared_ptr<Promise<Nothing>> promise,
Connection connection,
- Future<Response> future)
+ ContainerID checkContainerId,
+ shared_ptr<bool> checkTimedOut,
+ const string& failure)
{
- future.discard();
-
- // Closing the connection will make the agent kill the container.
- connection.disconnect();
-
- const Failure failure = Failure(
- "Command health check of task '" + stringify(taskId) +
- "' has timed out after " + stringify(checkTimeout));
-
- // We need to make a copy so that the lambda can capture it.
- const TaskID taskId_ = taskId;
-
- // If the health check delay interval is zero, we'll try to perform
- // another health check right after we finish processing the current
- // timeout.
- //
- // All the containers created for the health checks reuse the same
- // `ContainerID`. In order to prevent conflicts, the future returned
- // by this method should only be completed once we're sure that the
- // container has been cleaned up.
- return waitNestedContainer(checkContainerId)
- .repair([failure, taskId_](const Future<Option<int>>& waitFuture) {
- // We assume that once `WaitNestedContainer` returns, irrespective of
- // whether the response contains a failure, the container will be in a
- // terminal state, so starting a new health check will not lead to a
- // transient failure.
- //
- // This means that we don't need to retry the `WaitNestedContainer`
- // call.
- LOG(WARNING) << "Unable to get the exit code of command health check of "
- << "task '" << stringify(taskId_)
- << "': " << waitFuture.failure();
-
- return Future<Option<int>>(failure);
- })
- .then([failure](const Option<int>&) {
- return Future<Response>(failure);
- });
+ if (*checkTimedOut) {
+ // The health check timed out, closing the connection will make the
+ // agent kill the container.
+ connection.disconnect();
+
+ // If the health check delay interval is zero, we'll try to perform
+ // another health check right after we finish processing the current
+ // timeout.
+ //
+ // All the containers created for the health checks reuse the same
+ // `ContainerID`. In order to prevent conflicts, the promise should
+ // be completed once we're sure that the container has been cleaned
+ // up.
+ waitNestedContainer(checkContainerId)
+ .onAny([failure, promise](const Future<Option<int>>&) {
+ // We assume that once `WaitNestedContainer` returns, irrespective of
+ // whether the response contains a failure, the container will be in a
+ // terminal state, so starting a new health check will not lead to a
+ // transient failure.
+ //
+ // This means that we don't need to retry the `WaitNestedContainer`
+ // call.
+ promise->fail(failure);
+ });
+ } else {
+ // The agent was not able to complete the request, discarding the
+ // promise signals the health checker that it should retry the
+ // health check.
+ //
+ // This will allow us to recover from a blip. The executor will
+ // pause the health checker when it detects that the agent is not
+ // available.
+ LOG(WARNING) << "Connection to the agent to launch COMMAND health check"
+ << " for task '" << taskId << "' failed: " << failure;
+
+ promise->discard();
+ }
}
@@ -653,10 +712,13 @@ Future<Option<int>> HealthCheckerProcess::waitNestedContainer(
{"Content-Type", stringify(ContentType::PROTOBUF)}};
return process::http::request(request, false)
+ .repair([this](const Future<Response>& future) {
+ return Failure(
+ "Connection to wait for a health check of task " +
+ stringify(taskId) + " failed: " + future.failure());
+ })
.then(defer(self(),
- &Self::_waitNestedContainer,
- containerId,
- lambda::_1));
+ &Self::_waitNestedContainer, containerId, lambda::_1));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/0c0fbc57/src/checks/health_checker.hpp
----------------------------------------------------------------------
diff --git a/src/checks/health_checker.hpp b/src/checks/health_checker.hpp
index 29df49b..3d17ea8 100644
--- a/src/checks/health_checker.hpp
+++ b/src/checks/health_checker.hpp
@@ -17,6 +17,7 @@
#ifndef __HEALTH_CHECKER_HPP__
#define __HEALTH_CHECKER_HPP__
+#include <memory>
#include <string>
#include <tuple>
#include <vector>
@@ -156,18 +157,21 @@ private:
process::Future<Nothing> nestedCommandHealthCheck();
- process::Future<Nothing> _nestedCommandHealthCheck(
+ void _nestedCommandHealthCheck(
+ std::shared_ptr<process::Promise<Nothing>> promise,
process::http::Connection connection);
- process::Future<Nothing> __nestedCommandHealthCheck(
+ void __nestedCommandHealthCheck(
+ std::shared_ptr<process::Promise<Nothing>> promise,
const ContainerID& checkContainerId,
const process::http::Response& launchResponse);
- process::Future<process::http::Response>
- nestedCommandHealthCheckTimedOut(
- const ContainerID& checkContainerId,
+ void nestedCommandHealthCheckFailure(
+ std::shared_ptr<process::Promise<Nothing>> promise,
process::http::Connection connection,
- process::Future<process::http::Response> future);
+ ContainerID checkContainerId,
+ std::shared_ptr<bool> checkTimedOut,
+ const std::string& failure);
/**
* Waits for a container to be terminated.