You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/03/30 17:35:38 UTC
[1/4] mesos git commit: Cleaned up namespaces in "checker.cpp".
Repository: mesos
Updated Branches:
refs/heads/master 080e1b7eb -> 1a1fa95d0
Cleaned up namespaces in "checker.cpp".
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1a1fa95d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1a1fa95d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1a1fa95d
Branch: refs/heads/master
Commit: 1a1fa95d0de179d7efab002a99a0e6261ce307f9
Parents: 3f81c6f
Author: Alexander Rukletsov <al...@apache.org>
Authored: Thu Mar 30 17:53:14 2017 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Thu Mar 30 19:34:24 2017 +0200
----------------------------------------------------------------------
src/checks/checker.cpp | 108 ++++++++++++++++++++------------------------
1 file changed, 50 insertions(+), 58 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/1a1fa95d/src/checks/checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.cpp b/src/checks/checker.cpp
index e48e037..7510bf2 100644
--- a/src/checks/checker.cpp
+++ b/src/checks/checker.cpp
@@ -68,15 +68,14 @@
#include "linux/ns.hpp"
#endif
+namespace http = process::http;
+
using process::Failure;
using process::Future;
using process::Owned;
using process::Promise;
using process::Subprocess;
-using process::http::Connection;
-using process::http::Response;
-
using std::map;
using std::shared_ptr;
using std::string;
@@ -139,7 +138,7 @@ public:
const Option<pid_t>& _taskPid,
const vector<string>& _namespaces,
const Option<ContainerID>& _taskContainerId,
- const Option<process::http::URL>& _agentURL,
+ const Option<http::URL>& _agentURL,
bool _commandCheckViaAgent);
void pause();
@@ -158,44 +157,40 @@ private:
const Stopwatch& stopwatch,
const Option<CheckStatusInfo>& result);
- process::Future<int> commandCheck();
+ Future<int> commandCheck();
- process::Future<int> nestedCommandCheck();
- void _nestedCommandCheck(std::shared_ptr<process::Promise<int>> promise);
+ Future<int> nestedCommandCheck();
+ void _nestedCommandCheck(shared_ptr<Promise<int>> promise);
void __nestedCommandCheck(
- std::shared_ptr<process::Promise<int>> promise,
- process::http::Connection connection);
+ shared_ptr<Promise<int>> promise,
+ http::Connection connection);
void ___nestedCommandCheck(
- std::shared_ptr<process::Promise<int>> promise,
+ shared_ptr<Promise<int>> promise,
const ContainerID& checkContainerId,
- const process::http::Response& launchResponse);
+ const http::Response& launchResponse);
void nestedCommandCheckFailure(
- std::shared_ptr<process::Promise<int>> promise,
- process::http::Connection connection,
+ shared_ptr<Promise<int>> promise,
+ http::Connection connection,
ContainerID checkContainerId,
- std::shared_ptr<bool> checkTimedOut,
- const std::string& failure);
+ shared_ptr<bool> checkTimedOut,
+ const string& failure);
- process::Future<Option<int>> waitNestedContainer(
- const ContainerID& containerId);
- process::Future<Option<int>> _waitNestedContainer(
+ Future<Option<int>> waitNestedContainer(const ContainerID& containerId);
+ Future<Option<int>> _waitNestedContainer(
const ContainerID& containerId,
- const process::http::Response& httpResponse);
+ const http::Response& httpResponse);
void processCommandCheckResult(
const Stopwatch& stopwatch,
- const process::Future<int>& result);
-
- process::Future<int> httpCheck();
- process::Future<int> _httpCheck(
- const std::tuple<
- process::Future<Option<int>>,
- process::Future<std::string>,
- process::Future<std::string>>& t);
+ const Future<int>& result);
+
+ Future<int> httpCheck();
+ Future<int> _httpCheck(
+ const tuple<Future<Option<int>>, Future<string>, Future<string>>& t);
void processHttpCheckResult(
const Stopwatch& stopwatch,
- const process::Future<int>& result);
+ const Future<int>& result);
const CheckInfo check;
Duration checkDelay;
@@ -205,9 +200,9 @@ private:
const lambda::function<void(const CheckStatusInfo&)> updateCallback;
const TaskID taskId;
const Option<pid_t> taskPid;
- const std::vector<std::string> namespaces;
+ const vector<string> namespaces;
const Option<ContainerID> taskContainerId;
- const Option<process::http::URL> agentURL;
+ const Option<http::URL> agentURL;
const bool commandCheckViaAgent;
Option<lambda::function<pid_t(const lambda::function<int()>&)>> clone;
@@ -248,12 +243,12 @@ Try<Owned<Checker>> Checker::create(
}
-Try<process::Owned<Checker>> Checker::create(
+Try<Owned<Checker>> Checker::create(
const CheckInfo& check,
const lambda::function<void(const CheckStatusInfo&)>& callback,
const TaskID& taskId,
const ContainerID& taskContainerId,
- const process::http::URL& agentURL)
+ const http::URL& agentURL)
{
// Validate the `CheckInfo` protobuf.
Option<Error> error = validation::checkInfo(check);
@@ -308,7 +303,7 @@ CheckerProcess::CheckerProcess(
const Option<pid_t>& _taskPid,
const vector<string>& _namespaces,
const Option<ContainerID>& _taskContainerId,
- const Option<process::http::URL>& _agentURL,
+ const Option<http::URL>& _agentURL,
bool _commandCheckViaAgent)
: ProcessBase(process::ID::generate("checker")),
check(_check),
@@ -587,14 +582,14 @@ Future<int> CheckerProcess::nestedCommandCheck()
removeContainer->mutable_container_id()->CopyFrom(
previousCheckContainerId.get());
- process::http::Request request;
+ http::Request request;
request.method = "POST";
request.url = agentURL.get();
request.body = serialize(ContentType::PROTOBUF, evolve(call));
request.headers = {{"Accept", stringify(ContentType::PROTOBUF)},
{"Content-Type", stringify(ContentType::PROTOBUF)}};
- process::http::request(request, false)
+ http::request(request, false)
.onFailed(defer(self(),
[this, promise](const string& failure) {
LOG(WARNING) << "Connection to remove the nested container '"
@@ -606,8 +601,8 @@ Future<int> CheckerProcess::nestedCommandCheck()
// as a transient failure and discard the promise.
promise->discard();
}))
- .onReady(defer(self(), [this, promise](const Response& response) {
- if (response.code != process::http::Status::OK) {
+ .onReady(defer(self(), [this, promise](const http::Response& response) {
+ if (response.code != http::Status::OK) {
// The agent was unable to remove the check container, we
// treat this as a transient failure and discard the promise.
LOG(WARNING) << "Received '" << response.status << "' ("
@@ -630,14 +625,13 @@ Future<int> CheckerProcess::nestedCommandCheck()
}
-void CheckerProcess::_nestedCommandCheck(
- shared_ptr<process::Promise<int>> promise)
+void CheckerProcess::_nestedCommandCheck(shared_ptr<Promise<int>> promise)
{
// TODO(alexr): Use a lambda named capture for
// this cached value once it is available.
const TaskID _taskId = taskId;
- process::http::connect(agentURL.get())
+ http::connect(agentURL.get())
.onFailed(defer(self(), [_taskId, promise](const string& failure) {
LOG(WARNING) << "Unable to establish connection with the agent to launch"
<< " COMMAND check for task '" << _taskId << "'"
@@ -651,8 +645,8 @@ void CheckerProcess::_nestedCommandCheck(
void CheckerProcess::__nestedCommandCheck(
- shared_ptr<process::Promise<int>> promise,
- Connection connection)
+ shared_ptr<Promise<int>> promise,
+ http::Connection connection)
{
ContainerID checkContainerId;
checkContainerId.set_value("check-" + UUID::random().toString());
@@ -671,7 +665,7 @@ void CheckerProcess::__nestedCommandCheck(
launch->mutable_container_id()->CopyFrom(checkContainerId);
launch->mutable_command()->CopyFrom(command);
- process::http::Request request;
+ http::Request request;
request.method = "POST";
request.url = agentURL.get();
request.body = serialize(ContentType::PROTOBUF, evolve(call));
@@ -698,7 +692,8 @@ void CheckerProcess::__nestedCommandCheck(
// check command has finished or the connection has been closed.
connection.send(request, false)
.after(checkTimeout,
- defer(self(), [timeout, checkTimedOut](Future<Response> future) {
+ defer(self(),
+ [timeout, checkTimedOut](Future<http::Response> future) {
future.discard();
*checkTimedOut = true;
@@ -721,11 +716,11 @@ void CheckerProcess::__nestedCommandCheck(
void CheckerProcess::___nestedCommandCheck(
- shared_ptr<process::Promise<int>> promise,
+ shared_ptr<Promise<int>> promise,
const ContainerID& checkContainerId,
- const Response& launchResponse)
+ const http::Response& launchResponse)
{
- if (launchResponse.code != process::http::Status::OK) {
+ if (launchResponse.code != http::Status::OK) {
// The agent was unable to launch the check container,
// we treat this as a transient failure.
LOG(WARNING) << "Received '" << launchResponse.status << "' ("
@@ -760,7 +755,7 @@ void CheckerProcess::___nestedCommandCheck(
void CheckerProcess::nestedCommandCheckFailure(
shared_ptr<Promise<int>> promise,
- Connection connection,
+ http::Connection connection,
ContainerID checkContainerId,
shared_ptr<bool> checkTimedOut,
const string& failure)
@@ -814,15 +809,15 @@ Future<Option<int>> CheckerProcess::waitNestedContainer(
containerWait->mutable_container_id()->CopyFrom(containerId);
- process::http::Request request;
+ http::Request request;
request.method = "POST";
request.url = agentURL.get();
request.body = serialize(ContentType::PROTOBUF, evolve(call));
request.headers = {{"Accept", stringify(ContentType::PROTOBUF)},
{"Content-Type", stringify(ContentType::PROTOBUF)}};
- return process::http::request(request, false)
- .repair([containerId](const Future<Response>& future) {
+ return http::request(request, false)
+ .repair([containerId](const Future<http::Response>& future) {
return Failure(
"Connection to wait for check container '" +
stringify(containerId) + "' failed: " + future.failure());
@@ -834,9 +829,9 @@ Future<Option<int>> CheckerProcess::waitNestedContainer(
Future<Option<int>> CheckerProcess::_waitNestedContainer(
const ContainerID& containerId,
- const Response& httpResponse)
+ const http::Response& httpResponse)
{
- if (httpResponse.code != process::http::Status::OK) {
+ if (httpResponse.code != http::Status::OK) {
return Failure(
"Received '" + httpResponse.status + "' (" + httpResponse.body +
") while waiting on check container '" + stringify(containerId) + "'");
@@ -979,10 +974,7 @@ Future<int> CheckerProcess::httpCheck()
Future<int> CheckerProcess::_httpCheck(
- const tuple<
- Future<Option<int>>,
- Future<string>,
- Future<string>>& t)
+ const tuple<Future<Option<int>>, Future<string>, Future<string>>& t)
{
const Future<Option<int>>& status = std::get<0>(t);
if (!status.isReady()) {
@@ -1032,7 +1024,7 @@ Future<int> CheckerProcess::_httpCheck(
void CheckerProcess::processHttpCheckResult(
const Stopwatch& stopwatch,
- const process::Future<int>& result)
+ const Future<int>& result)
{
CheckStatusInfo checkStatusInfo;
checkStatusInfo.set_type(check.type());
[3/4] mesos git commit: Added support for COMMAND checks to the
default executor.
Posted by al...@apache.org.
Added support for COMMAND checks to the default executor.
Review: https://reviews.apache.org/r/58030/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3f81c6f6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3f81c6f6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3f81c6f6
Branch: refs/heads/master
Commit: 3f81c6f6052768e326e84e2eab93c20572b490ad
Parents: 3a689ab
Author: Gast�n Kleiman <ga...@mesosphere.io>
Authored: Thu Mar 30 17:14:14 2017 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Thu Mar 30 19:34:24 2017 +0200
----------------------------------------------------------------------
src/checks/checker.cpp | 454 ++++++++++++++++++--
src/checks/checker.hpp | 32 +-
src/launcher/default_executor.cpp | 8 +-
src/tests/check_tests.cpp | 733 ++++++++++++++++++++++++++++++++-
4 files changed, 1181 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3f81c6f6/src/checks/checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.cpp b/src/checks/checker.cpp
index d1e9083..e48e037 100644
--- a/src/checks/checker.cpp
+++ b/src/checks/checker.cpp
@@ -19,6 +19,7 @@
#include <cstdint>
#include <iterator>
#include <map>
+#include <memory>
#include <string>
#include <tuple>
#include <vector>
@@ -28,6 +29,8 @@
#include <mesos/mesos.hpp>
#include <mesos/type_utils.hpp>
+#include <mesos/agent/agent.hpp>
+
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
@@ -49,14 +52,18 @@
#include <stout/strings.hpp>
#include <stout/try.hpp>
#include <stout/unreachable.hpp>
+#include <stout/uuid.hpp>
#include <stout/os/environment.hpp>
#include <stout/os/killtree.hpp>
+#include "common/http.hpp"
#include "common/protobuf_utils.hpp"
#include "common/status_utils.hpp"
#include "common/validation.hpp"
+#include "internal/evolve.hpp"
+
#ifdef __linux__
#include "linux/ns.hpp"
#endif
@@ -64,9 +71,14 @@
using process::Failure;
using process::Future;
using process::Owned;
+using process::Promise;
using process::Subprocess;
+using process::http::Connection;
+using process::http::Response;
+
using std::map;
+using std::shared_ptr;
using std::string;
using std::tuple;
using std::vector;
@@ -125,7 +137,10 @@ public:
const lambda::function<void(const CheckStatusInfo&)>& _callback,
const TaskID& _taskId,
const Option<pid_t>& _taskPid,
- const std::vector<std::string>& _namespaces);
+ const vector<string>& _namespaces,
+ const Option<ContainerID>& _taskContainerId,
+ const Option<process::http::URL>& _agentURL,
+ bool _commandCheckViaAgent);
void pause();
void resume();
@@ -141,9 +156,33 @@ private:
void scheduleNext(const Duration& duration);
void processCheckResult(
const Stopwatch& stopwatch,
- const CheckStatusInfo& result);
+ const Option<CheckStatusInfo>& result);
process::Future<int> commandCheck();
+
+ process::Future<int> nestedCommandCheck();
+ void _nestedCommandCheck(std::shared_ptr<process::Promise<int>> promise);
+ void __nestedCommandCheck(
+ std::shared_ptr<process::Promise<int>> promise,
+ process::http::Connection connection);
+ void ___nestedCommandCheck(
+ std::shared_ptr<process::Promise<int>> promise,
+ const ContainerID& checkContainerId,
+ const process::http::Response& launchResponse);
+
+ void nestedCommandCheckFailure(
+ std::shared_ptr<process::Promise<int>> promise,
+ process::http::Connection connection,
+ ContainerID checkContainerId,
+ std::shared_ptr<bool> checkTimedOut,
+ const std::string& failure);
+
+ process::Future<Option<int>> waitNestedContainer(
+ const ContainerID& containerId);
+ process::Future<Option<int>> _waitNestedContainer(
+ const ContainerID& containerId,
+ const process::http::Response& httpResponse);
+
void processCommandCheckResult(
const Stopwatch& stopwatch,
const process::Future<int>& result);
@@ -167,10 +206,18 @@ private:
const TaskID taskId;
const Option<pid_t> taskPid;
const std::vector<std::string> namespaces;
+ const Option<ContainerID> taskContainerId;
+ const Option<process::http::URL> agentURL;
+ const bool commandCheckViaAgent;
+
Option<lambda::function<pid_t(const lambda::function<int()>&)>> clone;
CheckStatusInfo previousCheckStatus;
bool paused;
+
+ // Contains the ID of the most recently terminated nested container
+ // that was used to perform a COMMAND check.
+ Option<ContainerID> previousCheckContainerId;
};
@@ -192,7 +239,37 @@ Try<Owned<Checker>> Checker::create(
callback,
taskId,
taskPid,
- namespaces));
+ namespaces,
+ None(),
+ None(),
+ false));
+
+ return Owned<Checker>(new Checker(process));
+}
+
+
+Try<process::Owned<Checker>> Checker::create(
+ const CheckInfo& check,
+ const lambda::function<void(const CheckStatusInfo&)>& callback,
+ const TaskID& taskId,
+ const ContainerID& taskContainerId,
+ const process::http::URL& agentURL)
+{
+ // Validate the `CheckInfo` protobuf.
+ Option<Error> error = validation::checkInfo(check);
+ if (error.isSome()) {
+ return error.get();
+ }
+
+ Owned<CheckerProcess> process(new CheckerProcess(
+ check,
+ callback,
+ taskId,
+ None(),
+ {},
+ taskContainerId,
+ agentURL,
+ true));
return Owned<Checker>(new Checker(process));
}
@@ -229,13 +306,19 @@ CheckerProcess::CheckerProcess(
const lambda::function<void(const CheckStatusInfo&)>& _callback,
const TaskID& _taskId,
const Option<pid_t>& _taskPid,
- const vector<string>& _namespaces)
+ const vector<string>& _namespaces,
+ const Option<ContainerID>& _taskContainerId,
+ const Option<process::http::URL>& _agentURL,
+ bool _commandCheckViaAgent)
: ProcessBase(process::ID::generate("checker")),
check(_check),
updateCallback(_callback),
taskId(_taskId),
taskPid(_taskPid),
namespaces(_namespaces),
+ taskContainerId(_taskContainerId),
+ agentURL(_agentURL),
+ commandCheckViaAgent(_commandCheckViaAgent),
paused(false)
{
Try<Duration> create = Duration::create(check.delay_seconds());
@@ -306,7 +389,9 @@ void CheckerProcess::performCheck()
switch (check.type()) {
case CheckInfo::COMMAND: {
- commandCheck().onAny(defer(
+ Future<int> future = commandCheckViaAgent ? nestedCommandCheck()
+ : commandCheck();
+ future.onAny(defer(
self(),
&Self::processCommandCheckResult, stopwatch, lambda::_1));
break;
@@ -361,7 +446,7 @@ void CheckerProcess::resume()
void CheckerProcess::processCheckResult(
const Stopwatch& stopwatch,
- const CheckStatusInfo& result)
+ const Option<CheckStatusInfo>& result)
{
// `Checker` might have been paused while performing the check.
if (paused) {
@@ -370,16 +455,20 @@ void CheckerProcess::processCheckResult(
return;
}
- VLOG(1) << "Performed " << check.type() << " check"
- << " for task '" << taskId << "' in " << stopwatch.elapsed();
-
- // Trigger the callback if check info changes.
- if (result != previousCheckStatus) {
- // We assume this is a local send, i.e., the checker library is not used
- // in a binary external to the executor and hence can not exit before
- // the data is sent to the executor.
- updateCallback(result);
- previousCheckStatus = result;
+ // `result` should be some if it was possible to perform the check,
+ // and empty if there was a transient error.
+ if (result.isSome()) {
+ VLOG(1) << "Performed " << check.type() << " check"
+ << " for task '" << taskId << "' in " << stopwatch.elapsed();
+
+ // Trigger the callback if check info changes.
+ if (result.get() != previousCheckStatus) {
+ // We assume this is a local send, i.e., the checker library is not used
+ // in a binary external to the executor and hence can not exit before
+ // the data is sent to the executor.
+ updateCallback(result.get());
+ previousCheckStatus = result.get();
+ }
}
scheduleNext(checkInterval);
@@ -470,37 +559,346 @@ Future<int> CheckerProcess::commandCheck()
}
+Future<int> CheckerProcess::nestedCommandCheck()
+{
+ CHECK_EQ(CheckInfo::COMMAND, check.type());
+ CHECK(check.has_command());
+ CHECK_SOME(taskContainerId);
+ CHECK_SOME(agentURL);
+
+ VLOG(1) << "Launching COMMAND check for task '" << taskId << "'";
+
+ // We don't want recoverable errors, e.g., the agent responding with
+ // HTTP status code 503, to trigger a check failure.
+ //
+ // The future returned by this method represents the result of a
+ // check. It will be set to the exit status of the check command if it
+ // succeeded, to a `Failure` if there was a non-transient error, and
+ // discarded if there was a transient error.
+ auto promise = std::make_shared<Promise<int>>();
+
+ if (previousCheckContainerId.isSome()) {
+ agent::Call call;
+ call.set_type(agent::Call::REMOVE_NESTED_CONTAINER);
+
+ agent::Call::RemoveNestedContainer* removeContainer =
+ call.mutable_remove_nested_container();
+
+ removeContainer->mutable_container_id()->CopyFrom(
+ previousCheckContainerId.get());
+
+ process::http::Request request;
+ request.method = "POST";
+ request.url = agentURL.get();
+ request.body = serialize(ContentType::PROTOBUF, evolve(call));
+ request.headers = {{"Accept", stringify(ContentType::PROTOBUF)},
+ {"Content-Type", stringify(ContentType::PROTOBUF)}};
+
+ process::http::request(request, false)
+ .onFailed(defer(self(),
+ [this, promise](const string& failure) {
+ LOG(WARNING) << "Connection to remove the nested container '"
+ << previousCheckContainerId.get()
+ << "' used for the COMMAND check for task '"
+ << taskId << "' failed: " << failure;
+
+ // Something went wrong while sending the request, we treat this
+ // as a transient failure and discard the promise.
+ promise->discard();
+ }))
+ .onReady(defer(self(), [this, promise](const Response& response) {
+ if (response.code != process::http::Status::OK) {
+ // The agent was unable to remove the check container, we
+ // treat this as a transient failure and discard the promise.
+ LOG(WARNING) << "Received '" << response.status << "' ("
+ << response.body << ") while removing the nested"
+ << " container '" << previousCheckContainerId.get()
+ << "' used for the COMMAND check for task '"
+ << taskId << "'";
+
+ promise->discard();
+ }
+
+ previousCheckContainerId = None();
+ _nestedCommandCheck(promise);
+ }));
+ } else {
+ _nestedCommandCheck(promise);
+ }
+
+ return promise->future();
+}
+
+
+void CheckerProcess::_nestedCommandCheck(
+ shared_ptr<process::Promise<int>> promise)
+{
+ // TODO(alexr): Use a lambda named capture for
+ // this cached value once it is available.
+ const TaskID _taskId = taskId;
+
+ process::http::connect(agentURL.get())
+ .onFailed(defer(self(), [_taskId, promise](const string& failure) {
+ LOG(WARNING) << "Unable to establish connection with the agent to launch"
+ << " COMMAND check for task '" << _taskId << "'"
+ << ": " << failure;
+
+ // We treat this as a transient failure.
+ promise->discard();
+ }))
+ .onReady(defer(self(), &Self::__nestedCommandCheck, promise, lambda::_1));
+}
+
+
+void CheckerProcess::__nestedCommandCheck(
+ shared_ptr<process::Promise<int>> promise,
+ Connection connection)
+{
+ ContainerID checkContainerId;
+ checkContainerId.set_value("check-" + UUID::random().toString());
+ checkContainerId.mutable_parent()->CopyFrom(taskContainerId.get());
+
+ previousCheckContainerId = checkContainerId;
+
+ CommandInfo command(check.command().command());
+
+ agent::Call call;
+ call.set_type(agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
+
+ agent::Call::LaunchNestedContainerSession* launch =
+ call.mutable_launch_nested_container_session();
+
+ launch->mutable_container_id()->CopyFrom(checkContainerId);
+ launch->mutable_command()->CopyFrom(command);
+
+ process::http::Request request;
+ request.method = "POST";
+ request.url = agentURL.get();
+ request.body = serialize(ContentType::PROTOBUF, evolve(call));
+ request.headers = {{"Accept", stringify(ContentType::RECORDIO)},
+ {"Message-Accept", stringify(ContentType::PROTOBUF)},
+ {"Content-Type", stringify(ContentType::PROTOBUF)}};
+
+ // TODO(alexr): Use a lambda named capture for
+ // this cached value once it is available.
+ const Duration timeout = checkTimeout;
+
+ auto checkTimedOut = std::make_shared<bool>(false);
+
+ // `LAUNCH_NESTED_CONTAINER_SESSION` returns a streamed response with
+ // the output of the container. The agent will close the stream once
+ // the container has exited, or kill the container if the client
+ // closes the connection.
+ //
+ // We're calling `Connection::send` with `streamed = false`, so that
+ // it returns an HTTP response of type 'BODY' once the entire response
+ // is received.
+ //
+ // This means that this future will not be completed until after the
+ // check command has finished or the connection has been closed.
+ connection.send(request, false)
+ .after(checkTimeout,
+ defer(self(), [timeout, checkTimedOut](Future<Response> future) {
+ future.discard();
+
+ *checkTimedOut = true;
+
+ return Failure("Command timed out after " + stringify(timeout));
+ }))
+ .onFailed(defer(self(),
+ &Self::nestedCommandCheckFailure,
+ promise,
+ connection,
+ checkContainerId,
+ checkTimedOut,
+ lambda::_1))
+ .onReady(defer(self(),
+ &Self::___nestedCommandCheck,
+ promise,
+ checkContainerId,
+ lambda::_1));
+}
+
+
+void CheckerProcess::___nestedCommandCheck(
+ shared_ptr<process::Promise<int>> promise,
+ const ContainerID& checkContainerId,
+ const Response& launchResponse)
+{
+ if (launchResponse.code != process::http::Status::OK) {
+ // The agent was unable to launch the check container,
+ // we treat this as a transient failure.
+ LOG(WARNING) << "Received '" << launchResponse.status << "' ("
+ << launchResponse.body << ") while launching COMMAND check"
+ << " for task '" << taskId << "'";
+
+ promise->discard();
+ return;
+ }
+
+ waitNestedContainer(checkContainerId)
+ .onFailed([promise](const string& failure) {
+ promise->fail(
+ "Unable to get the exit code: " + failure);
+ })
+ .onReady([promise](const Option<int>& status) -> void {
+ if (status.isNone()) {
+ promise->fail("Unable to get the exit code");
+ // TODO(gkleiman): Make sure that the following block works on Windows.
+ } else if (WIFSIGNALED(status.get()) &&
+ WTERMSIG(status.get()) == SIGKILL) {
+ // The check container was signaled, probably because the task
+ // finished while the check was still in-flight, so we discard
+ // the result.
+ promise->discard();
+ } else {
+ promise->set(status.get());
+ }
+ });
+}
+
+
+void CheckerProcess::nestedCommandCheckFailure(
+ shared_ptr<Promise<int>> promise,
+ Connection connection,
+ ContainerID checkContainerId,
+ shared_ptr<bool> checkTimedOut,
+ const string& failure)
+{
+ if (*checkTimedOut) {
+ // The check timed out, closing the connection will make the agent
+ // kill the container.
+ connection.disconnect();
+
+ // If the check delay interval is zero, we'll try to perform another
+ // check right after we finish processing the current timeout.
+ //
+ // We'll try to remove the container created for the check at the
+ // beginning of the next check. In order to prevent a failure, the
+ // promise should only be completed once we're sure that the
+ // container has terminated.
+ waitNestedContainer(checkContainerId)
+ .onAny([failure, promise](const Future<Option<int>>&) {
+ // We assume that once `WaitNestedContainer` returns,
+ // irrespective of whether the response contains a failure, the
+ // container will be in a terminal state, and that it will be
+ // possible to remove it.
+ //
+ // This means that we don't need to retry the
+ // `WaitNestedContainer` call.
+ promise->fail(failure);
+ });
+ } else {
+ // The agent was not able to complete the request, discarding the
+ // promise signals the checker that it should retry the check.
+ //
+ // This will allow us to recover from a blip. The executor will
+ // pause the checker when it detects that the agent is not
+ // available.
+ LOG(WARNING) << "Connection to the agent to launch COMMAND check"
+ << " for task '" << taskId << "' failed: " << failure;
+
+ promise->discard();
+ }
+}
+
+
+Future<Option<int>> CheckerProcess::waitNestedContainer(
+ const ContainerID& containerId)
+{
+ agent::Call call;
+ call.set_type(agent::Call::WAIT_NESTED_CONTAINER);
+
+ agent::Call::WaitNestedContainer* containerWait =
+ call.mutable_wait_nested_container();
+
+ containerWait->mutable_container_id()->CopyFrom(containerId);
+
+ process::http::Request request;
+ request.method = "POST";
+ request.url = agentURL.get();
+ request.body = serialize(ContentType::PROTOBUF, evolve(call));
+ request.headers = {{"Accept", stringify(ContentType::PROTOBUF)},
+ {"Content-Type", stringify(ContentType::PROTOBUF)}};
+
+ return process::http::request(request, false)
+ .repair([containerId](const Future<Response>& future) {
+ return Failure(
+ "Connection to wait for check container '" +
+ stringify(containerId) + "' failed: " + future.failure());
+ })
+ .then(defer(self(),
+ &Self::_waitNestedContainer, containerId, lambda::_1));
+}
+
+
+Future<Option<int>> CheckerProcess::_waitNestedContainer(
+ const ContainerID& containerId,
+ const Response& httpResponse)
+{
+ if (httpResponse.code != process::http::Status::OK) {
+ return Failure(
+ "Received '" + httpResponse.status + "' (" + httpResponse.body +
+ ") while waiting on check container '" + stringify(containerId) + "'");
+ }
+
+ Try<agent::Response> response =
+ deserialize<agent::Response>(ContentType::PROTOBUF, httpResponse.body);
+ CHECK_SOME(response);
+
+ CHECK(response->has_wait_nested_container());
+
+ return (
+ response->wait_nested_container().has_exit_status()
+ ? Option<int>(response->wait_nested_container().exit_status())
+ : Option<int>::none());
+}
+
+
void CheckerProcess::processCommandCheckResult(
const Stopwatch& stopwatch,
- const Future<int>& result)
+ const Future<int>& future)
{
- CheckStatusInfo checkStatusInfo;
- checkStatusInfo.set_type(check.type());
+ Option<CheckStatusInfo> result;
- // On Posix, `result` corresponds to termination information in the
+ // On Posix, `future` corresponds to termination information in the
// `stat_loc` area. On Windows, `status` is obtained via calling the
// `GetExitCodeProcess()` function.
//
// TODO(alexr): Ensure `WEXITSTATUS` family macros are no-op on Windows,
// see MESOS-7242.
- if (result.isReady() && WIFEXITED(result.get())) {
- const int exitCode = WEXITSTATUS(result.get());
- VLOG(1) << check.type() << " check for task '"
- << taskId << "' returned: " << exitCode;
+ if (future.isReady() && WIFEXITED(future.get())) {
+ const int exitCode = WEXITSTATUS(future.get());
+ VLOG(1) << check.type() << " check for task '" << taskId << "'"
+ << " returned: " << exitCode;
+ CheckStatusInfo checkStatusInfo;
+ checkStatusInfo.set_type(check.type());
checkStatusInfo.mutable_command()->set_exit_code(
static_cast<int32_t>(exitCode));
+
+ result = checkStatusInfo;
+ } else if (future.isDiscarded()) {
+ // Check's status is currently not available due to a transient error,
+ // e.g., due to the agent failover, no `CheckStatusInfo` message should
+ // be sent to the callback.
+ LOG(INFO) << check.type() << " check for task '" << taskId << "' discarded";
+
+ result = None();
} else {
// Check's status is currently not available, which may indicate a change
// that should be reported as an empty `CheckStatusInfo.Command` message.
- LOG(WARNING) << check.type() << " check for task '" << taskId
- << "' failed: "
- << (result.isFailed() ? result.failure() : "discarded");
+ LOG(WARNING) << check.type() << " check for task '" << taskId << "'"
+ << " failed: " << future.failure();
+ CheckStatusInfo checkStatusInfo;
+ checkStatusInfo.set_type(check.type());
checkStatusInfo.mutable_command();
+
+ result = checkStatusInfo;
}
- processCheckResult(stopwatch, checkStatusInfo);
+ processCheckResult(stopwatch, result);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/3f81c6f6/src/checks/checker.hpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.hpp b/src/checks/checker.hpp
index 1521b9c..fb939d8 100644
--- a/src/checks/checker.hpp
+++ b/src/checks/checker.hpp
@@ -22,6 +22,7 @@
#include <mesos/mesos.hpp>
+#include <process/http.hpp>
#include <process/owned.hpp>
#include <stout/error.hpp>
@@ -43,6 +44,9 @@ public:
* Attempts to create a `Checker` object. In case of success, checking
* starts immediately after initialization.
*
+ * If the check is a COMMAND check, the checker will fork a process, enter
+ * the task's namespaces, and execute the commmand.
+ *
* @param check The protobuf message definition of a check.
* @param callback A callback `Checker` uses to send check status updates
* to its owner (usually an executor).
@@ -56,12 +60,38 @@ public:
* `process::Stream<CheckStatusInfo>` rather than invoking a callback.
*/
static Try<process::Owned<Checker>> create(
- const CheckInfo& checkInfo,
+ const CheckInfo& check,
const lambda::function<void(const CheckStatusInfo&)>& callback,
const TaskID& taskId,
const Option<pid_t>& taskPid,
const std::vector<std::string>& namespaces);
+ /**
+ * Attempts to create a `Checker` object. In case of success, checking
+ * starts immediately after initialization.
+ *
+ * If the check is a COMMAND check, the checker will delegate the execution
+ * of the check to the Mesos agent via the `LaunchNestedContainerSession`
+ * API call.
+ *
+ * @param check The protobuf message definition of a check.
+ * @param callback A callback `Checker` uses to send check status updates
+ * to its owner (usually an executor).
+ * @param taskId The TaskID of the target task.
+ * @param taskContainerId The ContainerID of the target task.
+ * @param agentURL The URL of the agent.
+ * @return A `Checker` object or an error if `create` fails.
+ *
+ * @todo A better approach would be to return a stream of updates, e.g.,
+ * `process::Stream<CheckStatusInfo>` rather than invoking a callback.
+ */
+ static Try<process::Owned<Checker>> create(
+ const CheckInfo& check,
+ const lambda::function<void(const CheckStatusInfo&)>& callback,
+ const TaskID& taskId,
+ const ContainerID& taskContainerId,
+ const process::http::URL& agentURL);
+
~Checker();
// Not copyable, not assignable.
http://git-wip-us.apache.org/repos/asf/mesos/blob/3f81c6f6/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index 79785fc..9cc40c6 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -503,17 +503,13 @@ protected:
false});
if (task.has_check()) {
- // TODO(alexr): Add support for command checks.
- CHECK_NE(CheckInfo::COMMAND, task.check().type())
- << "Command checks are not supported yet";
-
Try<Owned<checks::Checker>> checker =
checks::Checker::create(
task.check(),
defer(self(), &Self::taskCheckUpdated, taskId, lambda::_1),
taskId,
- None(),
- vector<string>());
+ containerId,
+ agent);
if (checker.isError()) {
// TODO(anand): Should we send a TASK_FAILED instead?
http://git-wip-us.apache.org/repos/asf/mesos/blob/3f81c6f6/src/tests/check_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/check_tests.cpp b/src/tests/check_tests.cpp
index 78ac498..d7fcbf9 100644
--- a/src/tests/check_tests.cpp
+++ b/src/tests/check_tests.cpp
@@ -33,11 +33,16 @@
#include "checks/checker.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
#include "tests/flags.hpp"
#include "tests/health_check_test_helper.hpp"
#include "tests/mesos.hpp"
#include "tests/utils.hpp"
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::MesosContainerizer;
+
using mesos::master::detector::MasterDetector;
using mesos::v1::scheduler::Call;
@@ -211,6 +216,17 @@ public:
mesos->send(call);
}
+
+ virtual void teardown(
+ Mesos* mesos,
+ const v1::FrameworkID& frameworkId)
+ {
+ Call call;
+ call.set_type(Call::TEARDOWN);
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+
+ mesos->send(call);
+ }
};
@@ -661,10 +677,14 @@ TEST_F(CommandExecutorCheckTest, CommandCheckAndHealthCheckNoShadowing)
v1::TaskInfo taskInfo =
v1::createTask(agentId, resources, SLEEP_COMMAND(10000));
+ // Set both check and health check interval to an increased value to
+ // prevent a second update coming before reconciliation response.
+ int interval = 10;
+
v1::CheckInfo* checkInfo = taskInfo.mutable_check();
checkInfo->set_type(v1::CheckInfo::COMMAND);
checkInfo->set_delay_seconds(0);
- checkInfo->set_interval_seconds(0);
+ checkInfo->set_interval_seconds(interval);
checkInfo->mutable_command()->mutable_command()->set_value("exit 1");
// Delay health check for 1s to ensure health update comes after check update.
@@ -676,7 +696,7 @@ TEST_F(CommandExecutorCheckTest, CommandCheckAndHealthCheckNoShadowing)
v1::HealthCheck* healthCheckInfo = taskInfo.mutable_health_check();
healthCheckInfo->set_type(v1::HealthCheck::COMMAND);
healthCheckInfo->set_delay_seconds(1);
- healthCheckInfo->set_interval_seconds(0);
+ healthCheckInfo->set_interval_seconds(interval);
healthCheckInfo->mutable_command()->set_value("exit 0");
launchTask(&mesos, offer, taskInfo);
@@ -860,16 +880,707 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(CommandExecutorCheckTest, HTTPCheckDelivered)
// These are check tests with the default executor.
class DefaultExecutorCheckTest : public CheckTest {};
-// TODO(alexr): Implement following tests once the default executor supports
-// command checks.
+
+// Verifies that a command check is supported by the default executor,
+// its status is delivered in a task status update, and the last known
+// status can be obtained during explicit and implicit reconciliation.
+// Additionally ensures that the specified environment of the command
+// check is honored.
//
-// 1. COMMAND check with env var works, is delivered, and is reconciled
-// properly.
-// 2. COMMAND check's status change is delivered. TODO(alexr): When check
-// mocking is available, ensure only status changes are delivered.
-// 3. COMMAND check times out.
-// 4. COMMAND check and health check do not shadow each other; upon
-// reconciliation both statuses are available.
+// TODO(gkleiman): Check if this test works on Windows.
+TEST_F_TEMP_DISABLED_ON_WINDOWS(
+ DefaultExecutorCheckTest,
+ CommandCheckDeliveredAndReconciled)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Disable AuthN on the agent.
+ slave::Flags flags = CreateSlaveFlags();
+ flags.authenticate_http_readwrite = false;
+
+ Fetcher fetcher;
+
+ // We have to explicitly create a `Containerizer` in non-local mode,
+ // because `LaunchNestedContainerSession` (used by command checks)
+ // tries to start a IO switchboard, which doesn't work in local mode yet.
+ Try<MesosContainerizer*> _containerizer =
+ MesosContainerizer::create(flags, false, &fetcher);
+
+ ASSERT_SOME(_containerizer);
+
+ Owned<slave::Containerizer> containerizer(_containerizer.get());
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> agent =
+ StartSlave(detector.get(), containerizer.get(), flags);
+ ASSERT_SOME(agent);
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+
+ const v1::Resources resources =
+ v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+ v1::ExecutorInfo executorInfo;
+ executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
+ executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
+ executorInfo.mutable_resources()->CopyFrom(resources);
+ executorInfo.mutable_shutdown_grace_period()->set_nanoseconds(
+ Seconds(10).ns());
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033.
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ subscribe(&mesos, frameworkInfo);
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ // Update `executorInfo` with the subscribed `frameworkId`.
+ executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0, offers->offers().size());
+ const v1::Offer& offer = offers->offers(0);
+ const v1::AgentID agentId = offer.agent_id();
+
+ Future<Event::Update> updateTaskRunning;
+ Future<Event::Update> updateCheckResult;
+ Future<Event::Update> updateExplicitReconciliation;
+ Future<Event::Update> updateImplicitReconciliation;
+
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskRunning))
+ .WillOnce(FutureArg<1>(&updateCheckResult))
+ .WillOnce(FutureArg<1>(&updateExplicitReconciliation))
+ .WillOnce(FutureArg<1>(&updateImplicitReconciliation))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ v1::TaskInfo taskInfo =
+ v1::createTask(agentId, resources, SLEEP_COMMAND(10000));
+
+ v1::CheckInfo* checkInfo = taskInfo.mutable_check();
+ checkInfo->set_type(v1::CheckInfo::COMMAND);
+ checkInfo->set_delay_seconds(0);
+ checkInfo->set_interval_seconds(0);
+
+ v1::CommandInfo* checkCommand =
+ checkInfo->mutable_command()->mutable_command();
+ checkCommand->set_value("exit $STATUS");
+
+ v1::Environment::Variable* variable =
+ checkCommand->mutable_environment()->add_variables();
+ variable->set_name("STATUS");
+ variable->set_value("1");
+
+ v1::TaskGroupInfo taskGroup;
+ taskGroup.add_tasks()->CopyFrom(taskInfo);
+
+ launchTaskGroup(&mesos, offer, executorInfo, taskGroup);
+
+ AWAIT_READY(updateTaskRunning);
+ const v1::TaskStatus& taskRunning = updateTaskRunning->status();
+
+ ASSERT_EQ(TASK_RUNNING, taskRunning.state());
+ EXPECT_EQ(taskInfo.task_id(), taskRunning.task_id());
+ EXPECT_TRUE(taskRunning.has_check_status());
+ EXPECT_TRUE(taskRunning.check_status().has_command());
+ EXPECT_FALSE(taskRunning.check_status().command().has_exit_code());
+
+ acknowledge(&mesos, frameworkId, taskRunning);
+
+ AWAIT_READY(updateCheckResult);
+ const v1::TaskStatus& checkResult = updateCheckResult->status();
+
+ ASSERT_EQ(TASK_RUNNING, checkResult.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+ checkResult.reason());
+ EXPECT_EQ(taskInfo.task_id(), checkResult.task_id());
+ EXPECT_TRUE(checkResult.has_check_status());
+ EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
+ EXPECT_EQ(1, checkResult.check_status().command().exit_code());
+
+ acknowledge(&mesos, frameworkId, checkResult);
+
+ // Trigger explicit reconciliation.
+ reconcile(
+ &mesos,
+ frameworkId,
+ {std::make_pair(checkResult.task_id(), checkResult.agent_id())});
+
+ AWAIT_READY(updateExplicitReconciliation);
+ const v1::TaskStatus& explicitReconciliation =
+ updateExplicitReconciliation->status();
+
+ ASSERT_EQ(TASK_RUNNING, explicitReconciliation.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_RECONCILIATION,
+ explicitReconciliation.reason());
+ EXPECT_EQ(taskInfo.task_id(), explicitReconciliation.task_id());
+ EXPECT_TRUE(explicitReconciliation.has_check_status());
+ EXPECT_TRUE(explicitReconciliation.check_status().command().has_exit_code());
+ EXPECT_EQ(1, explicitReconciliation.check_status().command().exit_code());
+
+ acknowledge(&mesos, frameworkId, explicitReconciliation);
+
+ // Trigger implicit reconciliation.
+ reconcile(&mesos, frameworkId, {});
+
+ AWAIT_READY(updateImplicitReconciliation);
+ const v1::TaskStatus& implicitReconciliation =
+ updateImplicitReconciliation->status();
+
+ ASSERT_EQ(TASK_RUNNING, implicitReconciliation.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_RECONCILIATION,
+ implicitReconciliation.reason());
+ EXPECT_EQ(taskInfo.task_id(), implicitReconciliation.task_id());
+ EXPECT_TRUE(implicitReconciliation.has_check_status());
+ EXPECT_TRUE(implicitReconciliation.check_status().command().has_exit_code());
+ EXPECT_EQ(1, implicitReconciliation.check_status().command().exit_code());
+
+ // Cleanup all mesos launched containers.
+ Future<hashset<ContainerID>> containerIds = containerizer->containers();
+ AWAIT_READY(containerIds);
+
+ EXPECT_CALL(*scheduler, disconnected(_));
+
+ teardown(&mesos, frameworkId);
+
+ foreach (const ContainerID& containerId, containerIds.get()) {
+ AWAIT_READY(containerizer->wait(containerId));
+ }
+}
+
+
+// Verifies that a command check's status changes are delivered.
+//
+// TODO(alexr): When check mocking is available, ensure that *only*
+// status changes are delivered.
+//
+// TODO(gkleiman): Check if this test works on Windows.
+TEST_F_TEMP_DISABLED_ON_WINDOWS(
+ DefaultExecutorCheckTest,
+ CommandCheckStatusChange)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Disable AuthN on the agent.
+ slave::Flags flags = CreateSlaveFlags();
+ flags.authenticate_http_readwrite = false;
+
+ Fetcher fetcher;
+
+ // We have to explicitly create a `Containerizer` in non-local mode,
+ // because `LaunchNestedContainerSession` (used by command checks)
+ // tries to start a IO switchboard, which doesn't work in local mode yet.
+ Try<MesosContainerizer*> _containerizer =
+ MesosContainerizer::create(flags, false, &fetcher);
+
+ ASSERT_SOME(_containerizer);
+
+ Owned<slave::Containerizer> containerizer(_containerizer.get());
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> agent =
+ StartSlave(detector.get(), containerizer.get(), flags);
+ ASSERT_SOME(agent);
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+
+ const v1::Resources resources =
+ v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+ v1::ExecutorInfo executorInfo;
+ executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
+ executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
+ executorInfo.mutable_resources()->CopyFrom(resources);
+ executorInfo.mutable_shutdown_grace_period()->set_nanoseconds(
+ Seconds(10).ns());
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033.
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ subscribe(&mesos, frameworkInfo);
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ // Update `executorInfo` with the subscribed `frameworkId`.
+ executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0, offers->offers().size());
+ const v1::Offer& offer = offers->offers(0);
+ const v1::AgentID agentId = offer.agent_id();
+
+ Future<Event::Update> updateTaskRunning;
+ Future<Event::Update> updateCheckResult;
+ Future<Event::Update> updateCheckResultChanged;
+ Future<Event::Update> updateCheckResultBack;
+
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskRunning))
+ .WillOnce(FutureArg<1>(&updateCheckResult))
+ .WillOnce(FutureArg<1>(&updateCheckResultChanged))
+ .WillOnce(FutureArg<1>(&updateCheckResultBack))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ v1::TaskInfo taskInfo =
+ v1::createTask(agentId, resources, SLEEP_COMMAND(10000));
+
+ v1::CheckInfo* checkInfo = taskInfo.mutable_check();
+ checkInfo->set_type(v1::CheckInfo::COMMAND);
+ checkInfo->set_delay_seconds(0);
+ checkInfo->set_interval_seconds(0);
+ checkInfo->mutable_command()->mutable_command()->set_value(
+ FLAPPING_CHECK_COMMAND(path::join(os::getcwd(), "XXXXXX")));
+
+ v1::TaskGroupInfo taskGroup;
+ taskGroup.add_tasks()->CopyFrom(taskInfo);
+
+ launchTaskGroup(&mesos, offer, executorInfo, taskGroup);
+
+ AWAIT_READY(updateTaskRunning);
+ ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
+ EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());
+
+ acknowledge(&mesos, frameworkId, updateTaskRunning->status());
+
+ AWAIT_READY(updateCheckResult);
+ const v1::TaskStatus& checkResult = updateCheckResult->status();
+
+ ASSERT_EQ(TASK_RUNNING, checkResult.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+ checkResult.reason());
+ EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
+ EXPECT_EQ(1, checkResult.check_status().command().exit_code());
+
+ acknowledge(&mesos, frameworkId, checkResult);
+
+ AWAIT_READY(updateCheckResultChanged);
+ const v1::TaskStatus& checkResultChanged = updateCheckResultChanged->status();
+
+ ASSERT_EQ(TASK_RUNNING, checkResultChanged.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+ checkResultChanged.reason());
+ EXPECT_TRUE(checkResultChanged.check_status().command().has_exit_code());
+ EXPECT_EQ(0, checkResultChanged.check_status().command().exit_code());
+
+ acknowledge(&mesos, frameworkId, checkResultChanged);
+
+ AWAIT_READY(updateCheckResultBack);
+ const v1::TaskStatus& checkResultBack = updateCheckResultBack->status();
+
+ ASSERT_EQ(TASK_RUNNING, checkResultBack.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+ checkResultBack.reason());
+ EXPECT_TRUE(checkResultBack.check_status().command().has_exit_code());
+ EXPECT_EQ(1, checkResultBack.check_status().command().exit_code());
+
+ // Cleanup all mesos launched containers.
+ Future<hashset<ContainerID>> containerIds = containerizer->containers();
+ AWAIT_READY(containerIds);
+
+ EXPECT_CALL(*scheduler, disconnected(_));
+
+ teardown(&mesos, frameworkId);
+
+ foreach (const ContainerID& containerId, containerIds.get()) {
+ AWAIT_READY(containerizer->wait(containerId));
+ }
+}
+
+
+// Verifies that when a command check times out after a successful check,
+// an empty check status update is delivered.
+//
+// TODO(gkleiman): Check if this test works on Windows.
+TEST_F_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorCheckTest, CommandCheckTimeout)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Disable AuthN on the agent.
+ slave::Flags flags = CreateSlaveFlags();
+ flags.authenticate_http_readwrite = false;
+
+ Fetcher fetcher;
+
+ // We have to explicitly create a `Containerizer` in non-local mode,
+ // because `LaunchNestedContainerSession` (used by command checks)
+ // tries to start a IO switchboard, which doesn't work in local mode yet.
+ Try<MesosContainerizer*> _containerizer =
+ MesosContainerizer::create(flags, false, &fetcher);
+
+ ASSERT_SOME(_containerizer);
+
+ Owned<slave::Containerizer> containerizer(_containerizer.get());
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> agent =
+ StartSlave(detector.get(), containerizer.get(), flags);
+ ASSERT_SOME(agent);
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+
+ const v1::Resources resources =
+ v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+ v1::ExecutorInfo executorInfo;
+ executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
+ executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
+ executorInfo.mutable_resources()->CopyFrom(resources);
+ executorInfo.mutable_shutdown_grace_period()->set_nanoseconds(
+ Seconds(10).ns());
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033.
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ subscribe(&mesos, frameworkInfo);
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ // Update `executorInfo` with the subscribed `frameworkId`.
+ executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0, offers->offers().size());
+ const v1::Offer& offer = offers->offers(0);
+ const v1::AgentID agentId = offer.agent_id();
+
+ Future<Event::Update> updateTaskRunning;
+ Future<Event::Update> updateCheckResult;
+ Future<Event::Update> updateCheckResultTimeout;
+
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskRunning))
+ .WillOnce(FutureArg<1>(&updateCheckResult))
+ .WillOnce(FutureArg<1>(&updateCheckResultTimeout))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ v1::TaskInfo taskInfo =
+ v1::createTask(agentId, resources, SLEEP_COMMAND(10000));
+
+ v1::CheckInfo* checkInfo = taskInfo.mutable_check();
+ checkInfo->set_type(v1::CheckInfo::COMMAND);
+ checkInfo->set_delay_seconds(0);
+ checkInfo->set_interval_seconds(0);
+ checkInfo->set_timeout_seconds(1);
+ checkInfo->mutable_command()->mutable_command()->set_value(
+ STALLING_CHECK_COMMAND(path::join(os::getcwd(), "XXXXXX")));
+
+ v1::TaskGroupInfo taskGroup;
+ taskGroup.add_tasks()->CopyFrom(taskInfo);
+
+ launchTaskGroup(&mesos, offer, executorInfo, taskGroup);
+
+ AWAIT_READY(updateTaskRunning);
+ ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
+ EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());
+
+ acknowledge(&mesos, frameworkId, updateTaskRunning->status());
+
+ AWAIT_READY(updateCheckResult);
+ const v1::TaskStatus& checkResult = updateCheckResult->status();
+
+ ASSERT_EQ(TASK_RUNNING, checkResult.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+ checkResult.reason());
+ EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
+ EXPECT_EQ(1, checkResult.check_status().command().exit_code());
+
+ acknowledge(&mesos, frameworkId, checkResult);
+
+ AWAIT_READY(updateCheckResultTimeout);
+ const v1::TaskStatus& checkResultTimeout = updateCheckResultTimeout->status();
+
+ ASSERT_EQ(TASK_RUNNING, checkResultTimeout.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+ checkResultTimeout.reason());
+ EXPECT_FALSE(checkResultTimeout.check_status().command().has_exit_code());
+
+ // Cleanup all mesos launched containers.
+ Future<hashset<ContainerID>> containerIds = containerizer->containers();
+ AWAIT_READY(containerIds);
+
+ EXPECT_CALL(*scheduler, disconnected(_));
+
+ teardown(&mesos, frameworkId);
+
+ foreach (const ContainerID& containerId, containerIds.get()) {
+ AWAIT_READY(containerizer->wait(containerId));
+ }
+}
+
+
+// Verifies that when both command check and health check are specified,
+// health and check updates include both statuses. Also verifies that
+// both statuses are included upon reconciliation.
+//
+// TODO(gkleiman): Check if this test works on Windows.
+TEST_F(DefaultExecutorCheckTest, CommandCheckAndHealthCheckNoShadowing)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Disable AuthN on the agent.
+ slave::Flags flags = CreateSlaveFlags();
+ flags.authenticate_http_readwrite = false;
+
+ Fetcher fetcher;
+
+ // We have to explicitly create a `Containerizer` in non-local mode,
+ // because `LaunchNestedContainerSession` (used by command checks)
+ // tries to start a IO switchboard, which doesn't work in local mode yet.
+ Try<MesosContainerizer*> _containerizer =
+ MesosContainerizer::create(flags, false, &fetcher);
+
+ ASSERT_SOME(_containerizer);
+
+ Owned<slave::Containerizer> containerizer(_containerizer.get());
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> agent =
+ StartSlave(detector.get(), containerizer.get(), flags);
+ ASSERT_SOME(agent);
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+
+ const v1::Resources resources =
+ v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+ v1::ExecutorInfo executorInfo;
+ executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
+ executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
+ executorInfo.mutable_resources()->CopyFrom(resources);
+ executorInfo.mutable_shutdown_grace_period()->set_nanoseconds(
+ Seconds(10).ns());
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033.
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ subscribe(&mesos, frameworkInfo);
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ // Update `executorInfo` with the subscribed `frameworkId`.
+ executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0, offers->offers().size());
+ const v1::Offer& offer = offers->offers(0);
+ const v1::AgentID agentId = offer.agent_id();
+
+ Future<Event::Update> updateTaskRunning;
+ Future<Event::Update> updateCheckResult;
+ Future<Event::Update> updateHealthResult;
+ Future<Event::Update> updateImplicitReconciliation;
+
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskRunning))
+ .WillOnce(FutureArg<1>(&updateCheckResult))
+ .WillOnce(FutureArg<1>(&updateHealthResult))
+ .WillOnce(FutureArg<1>(&updateImplicitReconciliation))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ v1::TaskInfo taskInfo =
+ v1::createTask(agentId, resources, SLEEP_COMMAND(10000));
+
+ // Set both check and health check interval to an increased value to
+ // prevent a second update coming before reconciliation response.
+ int interval = 10;
+
+ v1::CheckInfo* checkInfo = taskInfo.mutable_check();
+ checkInfo->set_type(v1::CheckInfo::COMMAND);
+ checkInfo->set_delay_seconds(0);
+ checkInfo->set_interval_seconds(interval);
+ checkInfo->mutable_command()->mutable_command()->set_value("exit 1");
+
+ // Delay health check for 1s to ensure health update comes after check update.
+ //
+ // TODO(alexr): This can lead to flakiness on busy agents. A more robust
+ // approach could be setting the grace period to MAX_INT, and make the
+ // health check pass iff a file created by the check exists. Alternatively,
+ // we can relax the expectation that the check update is delivered first.
+ v1::HealthCheck* healthCheckInfo = taskInfo.mutable_health_check();
+ healthCheckInfo->set_type(v1::HealthCheck::COMMAND);
+ healthCheckInfo->set_delay_seconds(1);
+ healthCheckInfo->set_interval_seconds(interval);
+ healthCheckInfo->mutable_command()->set_value("exit 0");
+
+ launchTask(&mesos, offer, taskInfo);
+
+ AWAIT_READY(updateTaskRunning);
+ ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
+ EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());
+
+ acknowledge(&mesos, frameworkId, updateTaskRunning->status());
+
+ AWAIT_READY(updateCheckResult);
+ const v1::TaskStatus& checkResult = updateCheckResult->status();
+
+ ASSERT_EQ(TASK_RUNNING, checkResult.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+ checkResult.reason());
+ EXPECT_EQ(taskInfo.task_id(), checkResult.task_id());
+ EXPECT_FALSE(checkResult.has_healthy());
+ EXPECT_TRUE(checkResult.has_check_status());
+ EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
+ EXPECT_EQ(1, checkResult.check_status().command().exit_code());
+
+ acknowledge(&mesos, frameworkId, checkResult);
+
+ AWAIT_READY(updateHealthResult);
+ const v1::TaskStatus& healthResult = updateHealthResult->status();
+
+ ASSERT_EQ(TASK_RUNNING, healthResult.state());
+ EXPECT_EQ(taskInfo.task_id(), healthResult.task_id());
+ EXPECT_TRUE(healthResult.has_healthy());
+ EXPECT_TRUE(healthResult.healthy());
+ EXPECT_TRUE(healthResult.has_check_status());
+ EXPECT_TRUE(healthResult.check_status().command().has_exit_code());
+ EXPECT_EQ(1, healthResult.check_status().command().exit_code());
+
+ acknowledge(&mesos, frameworkId, healthResult);
+
+ // Trigger implicit reconciliation.
+ reconcile(&mesos, frameworkId, {});
+
+ AWAIT_READY(updateImplicitReconciliation);
+ const v1::TaskStatus& implicitReconciliation =
+ updateImplicitReconciliation->status();
+
+ ASSERT_EQ(TASK_RUNNING, implicitReconciliation.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_RECONCILIATION,
+ implicitReconciliation.reason());
+ EXPECT_EQ(taskInfo.task_id(), implicitReconciliation.task_id());
+ EXPECT_TRUE(implicitReconciliation.has_healthy());
+ EXPECT_TRUE(implicitReconciliation.healthy());
+ EXPECT_TRUE(implicitReconciliation.has_check_status());
+ EXPECT_TRUE(implicitReconciliation.check_status().command().has_exit_code());
+ EXPECT_EQ(1, implicitReconciliation.check_status().command().exit_code());
+
+ // Cleanup all mesos launched containers.
+ Future<hashset<ContainerID>> containerIds = containerizer->containers();
+ AWAIT_READY(containerIds);
+
+ EXPECT_CALL(*scheduler, disconnected(_));
+
+ teardown(&mesos, frameworkId);
+
+ foreach (const ContainerID& containerId, containerIds.get()) {
+ AWAIT_READY(containerizer->wait(containerId));
+ }
+}
+
// Verifies that an HTTP check is supported by the default executor and
// its status is delivered in a task status update.
[2/4] mesos git commit: Enabled pause/resume for general checks.
Posted by al...@apache.org.
Enabled pause/resume for general checks.
Review: https://reviews.apache.org/r/57912/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3a689ab5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3a689ab5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3a689ab5
Branch: refs/heads/master
Commit: 3a689ab552a9ff23dd912e0178d3c5af393f7e84
Parents: 5c9ce37
Author: Gast�n Kleiman <ga...@mesosphere.io>
Authored: Thu Mar 30 13:41:14 2017 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Thu Mar 30 19:34:24 2017 +0200
----------------------------------------------------------------------
src/checks/checker.cpp | 52 ++++++++++++++++++++++++++++++++--
src/checks/checker.hpp | 7 ++---
src/launcher/default_executor.cpp | 22 +++++++++-----
src/launcher/executor.cpp | 4 +--
4 files changed, 69 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3a689ab5/src/checks/checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.cpp b/src/checks/checker.cpp
index 1664acd..d1e9083 100644
--- a/src/checks/checker.cpp
+++ b/src/checks/checker.cpp
@@ -127,6 +127,9 @@ public:
const Option<pid_t>& _taskPid,
const std::vector<std::string>& _namespaces);
+ void pause();
+ void resume();
+
virtual ~CheckerProcess() {}
protected:
@@ -167,6 +170,7 @@ private:
Option<lambda::function<pid_t(const lambda::function<int()>&)>> clone;
CheckStatusInfo previousCheckStatus;
+ bool paused;
};
@@ -208,9 +212,15 @@ Checker::~Checker()
}
-void Checker::stop()
+void Checker::pause()
+{
+ dispatch(process.get(), &CheckerProcess::pause);
+}
+
+
+void Checker::resume()
{
- terminate(process.get(), true);
+ dispatch(process.get(), &CheckerProcess::resume);
}
@@ -225,7 +235,8 @@ CheckerProcess::CheckerProcess(
updateCallback(_callback),
taskId(_taskId),
taskPid(_taskPid),
- namespaces(_namespaces)
+ namespaces(_namespaces),
+ paused(false)
{
Try<Duration> create = Duration::create(check.delay_seconds());
CHECK_SOME(create);
@@ -286,6 +297,10 @@ void CheckerProcess::finalize()
void CheckerProcess::performCheck()
{
+ if (paused) {
+ return;
+ }
+
Stopwatch stopwatch;
stopwatch.start();
@@ -314,16 +329,47 @@ void CheckerProcess::performCheck()
void CheckerProcess::scheduleNext(const Duration& duration)
{
+ CHECK(!paused);
+
VLOG(1) << "Scheduling check for task '" << taskId << "' in " << duration;
delay(duration, self(), &Self::performCheck);
}
+void CheckerProcess::pause()
+{
+ if (!paused) {
+ VLOG(1) << "Checking for task '" << taskId << "' paused";
+
+ paused = true;
+ }
+}
+
+
+void CheckerProcess::resume()
+{
+ if (paused) {
+ VLOG(1) << "Checking for task '" << taskId << "' resumed";
+
+ paused = false;
+
+ // Schedule a check immediately.
+ scheduleNext(Duration::zero());
+ }
+}
+
void CheckerProcess::processCheckResult(
const Stopwatch& stopwatch,
const CheckStatusInfo& result)
{
+ // `Checker` might have been paused while performing the check.
+ if (paused) {
+ LOG(INFO) << "Ignoring " << check.type() << " check result for"
+ << " task '" << taskId << "': checking is paused";
+ return;
+ }
+
VLOG(1) << "Performed " << check.type() << " check"
<< " for task '" << taskId << "' in " << stopwatch.elapsed();
http://git-wip-us.apache.org/repos/asf/mesos/blob/3a689ab5/src/checks/checker.hpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.hpp b/src/checks/checker.hpp
index e8af316..1521b9c 100644
--- a/src/checks/checker.hpp
+++ b/src/checks/checker.hpp
@@ -68,10 +68,9 @@ public:
Checker(const Checker&) = delete;
Checker& operator=(const Checker&) = delete;
- /**
- * Immediately stops checking. Any in-flight checks are dropped.
- */
- void stop();
+ // Idempotent helpers for pausing and resuming checking.
+ void pause();
+ void resume();
private:
explicit Checker(process::Owned<CheckerProcess> process);
http://git-wip-us.apache.org/repos/asf/mesos/blob/3a689ab5/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index 606fd9c..79785fc 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -163,8 +163,12 @@ public:
}
}
- // Pause all health checks.
+ // Pause all checks and health checks.
foreachvalue (Owned<Container> container, containers) {
+ if (container->checker.isSome()) {
+ container->checker->get()->pause();
+ }
+
if (container->healthChecker.isSome()) {
container->healthChecker->get()->pause();
}
@@ -193,8 +197,12 @@ public:
wait(containers.keys());
}
- // Resume all health checks.
+ // Resume all checks and health checks.
foreachvalue (Owned<Container> container, containers) {
+ if (container->checker.isSome()) {
+ container->checker->get()->resume();
+ }
+
if (container->healthChecker.isSome()) {
container->healthChecker->get()->resume();
}
@@ -738,11 +746,11 @@ protected:
deserialize<agent::Response>(contentType, response->body);
CHECK_SOME(waitResponse);
- // If there is an associated checker with the task, stop it to
- // avoid sending check updates after a terminal status update.
+ // If the task is checked, pause the associated checker to avoid
+ // sending check updates after a terminal status update.
if (container->checker.isSome()) {
CHECK_NOTNULL(container->checker->get());
- container->checker->get()->stop();
+ container->checker->get()->pause();
container->checker = None();
}
@@ -931,13 +939,13 @@ protected:
CHECK(!container->killing);
container->killing = true;
- // If the task is checked, stop the associated checker.
+ // If the task is checked, pause the associated checker.
//
// TODO(alexr): Once we support `TASK_KILLING` in this executor,
// consider continuing checking the task after sending `TASK_KILLING`.
if (container->checker.isSome()) {
CHECK_NOTNULL(container->checker->get());
- container->checker->get()->stop();
+ container->checker->get()->pause();
container->checker = None();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/3a689ab5/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 8bd266e..bc69beb 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -731,7 +731,7 @@ private:
// Stop checking the task.
if (checker.get() != nullptr) {
- checker->stop();
+ checker->pause();
}
// Stop health checking the task.
@@ -776,7 +776,7 @@ private:
// Stop checking the task.
if (checker.get() != nullptr) {
- checker->stop();
+ checker->pause();
}
// Stop health checking the task.
[4/4] mesos git commit: Improved log/failure messages in the
(health)checker libraries.
Posted by al...@apache.org.
Improved log/failure messages in the (health)checker libraries.
- Make log/failure messages consistent across both libraries.
- Task and container IDs are user generated and can contain spaces, so
we have to wrap them in single quotes.
- Remove the redundant task IDs from 'Failure' messages.
Review: https://reviews.apache.org/r/57854/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5c9ce378
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5c9ce378
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5c9ce378
Branch: refs/heads/master
Commit: 5c9ce378f627cbd4c2ed16f1e342dde16d5ee939
Parents: 080e1b7
Author: Gast�n Kleiman <ga...@mesosphere.io>
Authored: Thu Mar 30 12:58:38 2017 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Thu Mar 30 19:34:24 2017 +0200
----------------------------------------------------------------------
src/checks/checker.cpp | 60 +++++++-------
src/checks/health_checker.cpp | 166 ++++++++++++++++++++-----------------
src/tests/check_tests.cpp | 6 +-
3 files changed, 121 insertions(+), 111 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/5c9ce378/src/checks/checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.cpp b/src/checks/checker.cpp
index 3f2d8d8..1664acd 100644
--- a/src/checks/checker.cpp
+++ b/src/checks/checker.cpp
@@ -102,13 +102,12 @@ static pid_t cloneWithSetns(
Try<Nothing> setns = ns::setns(taskPid.get(), ns);
if (setns.isError()) {
// This effectively aborts the check.
- LOG(FATAL) << "Failed to enter the " << ns << " namespace of "
- << "task (pid: '" << taskPid.get() << "'): "
- << setns.error();
+ LOG(FATAL) << "Failed to enter the " << ns << " namespace of task"
+ << " (pid: " << taskPid.get() << "): " << setns.error();
}
- VLOG(1) << "Entered the " << ns << " namespace of "
- << "task (pid: '" << taskPid.get() << "') successfully";
+ VLOG(1) << "Entered the " << ns << " namespace of task"
+ << " (pid: " << taskPid.get() << ") successfully";
}
}
@@ -272,7 +271,7 @@ CheckerProcess::CheckerProcess(
void CheckerProcess::initialize()
{
- VLOG(1) << "Check configuration for task " << taskId << ":"
+ VLOG(1) << "Check configuration for task '" << taskId << "':"
<< " '" << jsonify(JSON::Protobuf(check)) << "'";
scheduleNext(checkDelay);
@@ -281,7 +280,7 @@ void CheckerProcess::initialize()
void CheckerProcess::finalize()
{
- LOG(INFO) << "Checking for task " << taskId << " stopped";
+ LOG(INFO) << "Checking for task '" << taskId << "' stopped";
}
@@ -315,7 +314,7 @@ void CheckerProcess::performCheck()
void CheckerProcess::scheduleNext(const Duration& duration)
{
- VLOG(1) << "Scheduling check for task " << taskId << " in " << duration;
+ VLOG(1) << "Scheduling check for task '" << taskId << "' in " << duration;
delay(duration, self(), &Self::performCheck);
}
@@ -325,8 +324,8 @@ void CheckerProcess::processCheckResult(
const Stopwatch& stopwatch,
const CheckStatusInfo& result)
{
- VLOG(1) << "Performed " << check.type() << " check for task " << taskId
- << " in " << stopwatch.elapsed();
+ VLOG(1) << "Performed " << check.type() << " check"
+ << " for task '" << taskId << "' in " << stopwatch.elapsed();
// Trigger the callback if check info changes.
if (result != previousCheckStatus) {
@@ -360,8 +359,8 @@ Future<int> CheckerProcess::commandCheck()
if (command.shell()) {
// Use the shell variant.
- VLOG(1) << "Launching command check '" << command.value() << "'"
- << " for task " << taskId;
+ VLOG(1) << "Launching COMMAND check '" << command.value() << "'"
+ << " for task '" << taskId << "'";
s = process::subprocess(
command.value(),
@@ -375,8 +374,8 @@ Future<int> CheckerProcess::commandCheck()
vector<string> argv(
std::begin(command.arguments()), std::end(command.arguments()));
- VLOG(1) << "Launching command check [" << command.value() << ", "
- << strings::join(", ", argv) << "] for task " << taskId;
+ VLOG(1) << "Launching COMMAND check [" << command.value() << ", "
+ << strings::join(", ", argv) << "] for task '" << taskId << "'";
s = process::subprocess(
command.value(),
@@ -407,14 +406,13 @@ Future<int> CheckerProcess::commandCheck()
if (commandPid != -1) {
// Cleanup the external command process.
- VLOG(1) << "Killing the command check process " << commandPid
- << " for task " << _taskId;
+ VLOG(1) << "Killing the COMMAND check process '" << commandPid
+ << "' for task '" << _taskId << "'";
os::killtree(commandPid, SIGKILL);
}
- return Failure(
- "Command timed out after " + stringify(timeout) + "; aborting");
+ return Failure("Command timed out after " + stringify(timeout));
})
.then([](const Option<int>& exitCode) -> Future<int> {
if (exitCode.isNone()) {
@@ -441,15 +439,16 @@ void CheckerProcess::processCommandCheckResult(
// see MESOS-7242.
if (result.isReady() && WIFEXITED(result.get())) {
const int exitCode = WEXITSTATUS(result.get());
- VLOG(1) << check.type() << " check for task "
- << taskId << " returned " << exitCode;
+ VLOG(1) << check.type() << " check for task '"
+ << taskId << "' returned: " << exitCode;
checkStatusInfo.mutable_command()->set_exit_code(
static_cast<int32_t>(exitCode));
} else {
// Check's status is currently not available, which may indicate a change
// that should be reported as an empty `CheckStatusInfo.Command` message.
- LOG(WARNING) << "Check for task " << taskId << " failed: "
+ LOG(WARNING) << check.type() << " check for task '" << taskId
+ << "' failed: "
<< (result.isFailed() ? result.failure() : "discarded");
checkStatusInfo.mutable_command();
@@ -471,7 +470,7 @@ Future<int> CheckerProcess::httpCheck()
const string url = scheme + "://" + DEFAULT_DOMAIN + ":" +
stringify(http.port()) + path;
- VLOG(1) << "Launching HTTP check '" << url << "' for task " << taskId;
+ VLOG(1) << "Launching HTTP check '" << url << "' for task '" << taskId << "'";
const vector<string> argv = {
HTTP_CHECK_COMMAND,
@@ -522,14 +521,14 @@ Future<int> CheckerProcess::httpCheck()
if (curlPid != -1) {
// Cleanup the HTTP_CHECK_COMMAND process.
VLOG(1) << "Killing the HTTP check process " << curlPid
- << " for task " << _taskId;
+ << " for task '" << _taskId << "'";
os::killtree(curlPid, SIGKILL);
}
return Failure(
string(HTTP_CHECK_COMMAND) + " timed out after " +
- stringify(timeout) + "; aborting");
+ stringify(timeout));
})
.then(defer(self(), &Self::_httpCheck, lambda::_1));
}
@@ -595,15 +594,15 @@ void CheckerProcess::processHttpCheckResult(
checkStatusInfo.set_type(check.type());
if (result.isReady()) {
- VLOG(1) << check.type() << " check for task "
- << taskId << " returned " << result.get();
+ VLOG(1) << check.type() << " check for task '"
+ << taskId << "' returned: " << result.get();
checkStatusInfo.mutable_http()->set_status_code(
static_cast<uint32_t>(result.get()));
} else {
// Check's status is currently not available, which may indicate a change
// that should be reported as an empty `CheckStatusInfo.Http` message.
- LOG(WARNING) << "Check for task " << taskId << " failed: "
+ LOG(WARNING) << "Check for task '" << taskId << "' failed: "
<< (result.isFailed() ? result.failure() : "discarded");
checkStatusInfo.mutable_http();
@@ -623,7 +622,7 @@ Option<Error> checkInfo(const CheckInfo& checkInfo)
switch (checkInfo.type()) {
case CheckInfo::COMMAND: {
if (!checkInfo.has_command()) {
- return Error("Expecting 'command' to be set for command check");
+ return Error("Expecting 'command' to be set for COMMAND check");
}
const CommandInfo& command = checkInfo.command().command();
@@ -656,8 +655,7 @@ Option<Error> checkInfo(const CheckInfo& checkInfo)
if (http.has_path() && !strings::startsWith(http.path(), '/')) {
return Error(
- "The path '" + http.path() +
- "' of HTTP check must start with '/'");
+ "The path '" + http.path() + "' of HTTP check must start with '/'");
}
break;
@@ -696,7 +694,7 @@ Option<Error> checkStatusInfo(const CheckStatusInfo& checkStatusInfo)
case CheckInfo::COMMAND: {
if (!checkStatusInfo.has_command()) {
return Error(
- "Expecting 'command' to be set for command check's status");
+ "Expecting 'command' to be set for COMMAND check's status");
}
break;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/5c9ce378/src/checks/health_checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/health_checker.cpp b/src/checks/health_checker.cpp
index b768c5b..769278c 100644
--- a/src/checks/health_checker.cpp
+++ b/src/checks/health_checker.cpp
@@ -116,13 +116,12 @@ static pid_t cloneWithSetns(
Try<Nothing> setns = ns::setns(taskPid.get(), ns);
if (setns.isError()) {
// This effectively aborts the health check.
- LOG(FATAL) << "Failed to enter the " << ns << " namespace of "
- << "task (pid: '" << taskPid.get() << "'): "
- << setns.error();
+ LOG(FATAL) << "Failed to enter the " << ns << " namespace of task"
+ << " (pid: '" << taskPid.get() << "'): " << setns.error();
}
- VLOG(1) << "Entered the " << ns << " namespace of "
- << "task (pid: '" << taskPid.get() << "') successfully";
+ VLOG(1) << "Entered the " << ns << " namespace of task"
+ << " (pid: '" << taskPid.get() << "') successfully";
}
}
@@ -269,7 +268,7 @@ HealthCheckerProcess::HealthCheckerProcess(
void HealthCheckerProcess::initialize()
{
- VLOG(1) << "Health check configuration:"
+ VLOG(1) << "Health check configuration for task '" << taskId << "':"
<< " '" << jsonify(JSON::Protobuf(check)) << "'";
startTime = Clock::now();
@@ -283,14 +282,17 @@ void HealthCheckerProcess::failure(const string& message)
if (initializing &&
checkGracePeriod.secs() > 0 &&
(Clock::now() - startTime) <= checkGracePeriod) {
- LOG(INFO) << "Ignoring failure as health check still in grace period";
+ LOG(INFO) << "Ignoring failure of "
+ << HealthCheck::Type_Name(check.type()) << " health check for"
+ << " task '" << taskId << "': still in grace period";
scheduleNext(checkInterval);
return;
}
consecutiveFailures++;
- LOG(WARNING) << "Health check failed " << consecutiveFailures
- << " times consecutively: " << message;
+ LOG(WARNING) << HealthCheck::Type_Name(check.type())
+ << " health check for task '" << taskId << "' failed "
+ << consecutiveFailures << " times consecutively: " << message;
bool killTask = consecutiveFailures >= check.consecutive_failures();
@@ -314,7 +316,8 @@ void HealthCheckerProcess::failure(const string& message)
void HealthCheckerProcess::success()
{
- VLOG(1) << HealthCheck::Type_Name(check.type()) << " health check passed";
+ VLOG(1) << HealthCheck::Type_Name(check.type()) << " health check for task '"
+ << taskId << "' passed";
// Send a healthy status update on the first success,
// and on the first success following failure(s).
@@ -376,20 +379,22 @@ void HealthCheckerProcess::processCheckResult(
{
// `HealthChecker` might have been paused while performing the check.
if (paused) {
- LOG(INFO) << "Ignoring health check result of task " + stringify(taskId) +
- " (health checking is paused)";
+ LOG(INFO) << "Ignoring " << HealthCheck::Type_Name(check.type())
+ << " health check result for task '" << taskId
+ << "': health checking is paused";
return;
}
if (future.isDiscarded()) {
- LOG(INFO) << HealthCheck::Type_Name(check.type()) +
- " health check of task " + stringify(taskId) + " discarded";
+ LOG(INFO) << HealthCheck::Type_Name(check.type()) << " health check for"
+ << " task '" << taskId << "' discarded";
scheduleNext(checkInterval);
return;
}
VLOG(1) << "Performed " << HealthCheck::Type_Name(check.type())
- << " health check in " << stopwatch.elapsed();
+ << " health check for task '" << taskId << "' in "
+ << stopwatch.elapsed();
if (future.isReady()) {
success();
@@ -397,7 +402,8 @@ void HealthCheckerProcess::processCheckResult(
}
string message = HealthCheck::Type_Name(check.type()) +
- " health check failed: " + future.failure();
+ " health check for task '" + stringify(taskId) +
+ "' failed: " + future.failure();
failure(message);
}
@@ -422,7 +428,8 @@ Future<Nothing> HealthCheckerProcess::commandHealthCheck()
if (command.shell()) {
// Use the shell variant.
- VLOG(1) << "Launching command health check '" << command.value() << "'";
+ VLOG(1) << "Launching COMMAND health check '" << command.value() << "'"
+ << " for task '" << taskId << "'";
external = subprocess(
command.value(),
@@ -438,8 +445,8 @@ Future<Nothing> HealthCheckerProcess::commandHealthCheck()
argv.push_back(arg);
}
- VLOG(1) << "Launching command health check [" << command.value() << ", "
- << strings::join(", ", argv) << "]";
+ VLOG(1) << "Launching COMMAND health check [" << command.value() << ", "
+ << strings::join(", ", argv) << "] for task '" << taskId << "'";
external = subprocess(
command.value(),
@@ -456,24 +463,27 @@ Future<Nothing> HealthCheckerProcess::commandHealthCheck()
return Failure("Failed to create subprocess: " + external.error());
}
+ // TODO(alexr): Use lambda named captures for
+ // these cached values once they are available.
pid_t commandPid = external->pid();
const Duration timeout = checkTimeout;
+ const TaskID _taskId = taskId;
return external->status()
.after(
timeout,
- [timeout, commandPid](Future<Option<int>> future) {
+ [timeout, commandPid, _taskId](Future<Option<int>> future) {
future.discard();
if (commandPid != -1) {
// Cleanup the external command process.
- VLOG(1) << "Killing the command health check process " << commandPid;
+ VLOG(1) << "Killing the COMMAND health check process '" << commandPid
+ << "' for task '" << _taskId << "'";
os::killtree(commandPid, SIGKILL);
}
- return Failure(
- "Command timed out after " + stringify(timeout) + "; aborting");
+ return Failure("Command timed out after " + stringify(timeout));
})
.then([](const Option<int>& status) -> Future<Nothing> {
if (status.isNone()) {
@@ -482,7 +492,7 @@ Future<Nothing> HealthCheckerProcess::commandHealthCheck()
int statusCode = status.get();
if (statusCode != 0) {
- return Failure("Command returned " + WSTRINGIFY(statusCode));
+ return Failure("Command returned: " + WSTRINGIFY(statusCode));
}
return Nothing();
@@ -497,7 +507,7 @@ Future<Nothing> HealthCheckerProcess::nestedCommandHealthCheck()
CHECK(check.has_command());
CHECK_SOME(agentURL);
- VLOG(1) << "Launching command health check of task " << stringify(taskId);
+ VLOG(1) << "Launching COMMAND health check for task '" << taskId << "'";
// We don't want recoverable errors, e.g., the agent responding with
// HTTP status code 503, to trigger a health check failure.
@@ -526,11 +536,12 @@ Future<Nothing> HealthCheckerProcess::nestedCommandHealthCheck()
{"Content-Type", stringify(ContentType::PROTOBUF)}};
process::http::request(request, false)
- .onFailed(defer(self(), [this, promise](const string& failure) {
- LOG(WARNING) << "Connection to remove the nested container "
- << stringify(previousCheckContainerId.get())
- << " used for the command health check of task "
- << stringify(taskId) << " failed: " << failure;
+ .onFailed(defer(self(),
+ [this, promise](const string& failure) {
+ LOG(WARNING) << "Connection to remove the nested container '"
+ << previousCheckContainerId.get()
+ << "' used for the COMMAND health check for task '"
+ << taskId << "' failed: " << failure;
// Something went wrong while sending the request, we treat this
// as a transient failure and discard the promise.
@@ -539,14 +550,12 @@ Future<Nothing> HealthCheckerProcess::nestedCommandHealthCheck()
.onReady(defer(self(), [this, promise](const Response& response) {
if (response.code != process::http::Status::OK) {
// The agent was unable to remove the health check container,
- // we treat this as a transient failure and discard the
- // promise.
+ // we treat this as a transient failure and discard the promise.
LOG(WARNING) << "Received '" << response.status << "' ("
- << response.body
- << ") while removing the nested container "
- << stringify(previousCheckContainerId.get())
- << " used for the COMMAND health check for task"
- << stringify(taskId);
+ << response.body << ") while removing the nested"
+ << " container '" << previousCheckContainerId.get()
+ << "' used for the COMMAND health check for task '"
+ << taskId << "'";
promise->discard();
}
@@ -612,8 +621,8 @@ void HealthCheckerProcess::__nestedCommandHealthCheck(
{"Message-Accept", stringify(ContentType::PROTOBUF)},
{"Content-Type", stringify(ContentType::PROTOBUF)}};
- // TODO(alexr): Use lambda named captures for
- // these cached values once they are available.
+ // TODO(alexr): Use a lambda named capture for
+ // this cached value once it is available.
const Duration timeout = checkTimeout;
auto checkTimedOut = std::make_shared<bool>(false);
@@ -637,8 +646,7 @@ void HealthCheckerProcess::__nestedCommandHealthCheck(
*checkTimedOut = true;
- return Failure(
- "Command timed out after " + stringify(timeout) + "; aborting");
+ return Failure("Command timed out after " + stringify(timeout));
}))
.onFailed(defer(self(),
&Self::nestedCommandHealthCheckFailure,
@@ -664,27 +672,21 @@ void HealthCheckerProcess::___nestedCommandHealthCheck(
// The agent was unable to launch the health check container, we
// treat this as a transient failure.
LOG(WARNING) << "Received '" << launchResponse.status << "' ("
- << launchResponse.body << ") while launching command health "
- << "check of task " << stringify(taskId);
+ << launchResponse.body << ") while launching COMMAND health"
+ << " check for task '" << taskId << "'";
promise->discard();
return;
}
- // We need to make a copy so that the lambdas can capture it.
- const TaskID _taskId = taskId;
-
waitNestedContainer(checkContainerId)
- .onFailed([_taskId, promise](const string& failure) {
+ .onFailed([promise](const string& failure) {
promise->fail(
- "Unable to get the exit code of command health check of task " +
- stringify(_taskId) + ": " + failure);
+ "Unable to get the exit code: " + failure);
})
- .onReady([_taskId, promise](const Option<int>& status) -> void {
+ .onReady([promise](const Option<int>& status) -> void {
if (status.isNone()) {
- promise->fail(
- "Unable to get the exit code of command health check of task " +
- stringify(_taskId));
+ promise->fail("Unable to get the exit code");
// TODO(gkleiman): Make sure that the following block works on Windows.
} else if (WIFSIGNALED(status.get()) &&
WTERMSIG(status.get()) == SIGKILL) {
@@ -693,9 +695,7 @@ void HealthCheckerProcess::___nestedCommandHealthCheck(
// the result.
promise->discard();
} else if (status.get() != 0) {
- promise->fail(
- "Command health check of task " + stringify(_taskId) +
- " returned " + WSTRINGIFY(status.get()));
+ promise->fail("Command returned: " + WSTRINGIFY(status.get()));
} else {
promise->set(Nothing());
}
@@ -769,10 +769,10 @@ Future<Option<int>> HealthCheckerProcess::waitNestedContainer(
{"Content-Type", stringify(ContentType::PROTOBUF)}};
return process::http::request(request, false)
- .repair([this](const Future<Response>& future) {
+ .repair([containerId](const Future<Response>& future) {
return Failure(
- "Connection to wait for a health check of task " +
- stringify(taskId) + " failed: " + future.failure());
+ "Connection to wait for health check container '" +
+ stringify(containerId) + "' failed: " + future.failure());
})
.then(defer(self(),
&Self::_waitNestedContainer, containerId, lambda::_1));
@@ -786,7 +786,8 @@ Future<Option<int>> HealthCheckerProcess::_waitNestedContainer(
if (httpResponse.code != process::http::Status::OK) {
return Failure(
"Received '" + httpResponse.status + "' (" + httpResponse.body +
- ") while waiting on health check of task " + stringify(taskId));
+ ") while waiting on health check container '" +
+ stringify(containerId) + "'");
}
Try<agent::Response> response =
@@ -814,7 +815,8 @@ Future<Nothing> HealthCheckerProcess::httpHealthCheck()
const string url = scheme + "://" + DEFAULT_DOMAIN + ":" +
stringify(http.port()) + path;
- VLOG(1) << "Launching HTTP health check '" << url << "'";
+ VLOG(1) << "Launching HTTP health check '" << url << "'"
+ << " for task '" << taskId << "'";
const vector<string> argv = {
HTTP_CHECK_COMMAND,
@@ -845,8 +847,11 @@ Future<Nothing> HealthCheckerProcess::httpHealthCheck()
" subprocess: " + s.error());
}
+ // TODO(alexr): Use lambda named captures for
+ // these cached values once they are available.
pid_t curlPid = s->pid();
const Duration timeout = checkTimeout;
+ const TaskID _taskId = taskId;
return await(
s->status(),
@@ -854,21 +859,22 @@ Future<Nothing> HealthCheckerProcess::httpHealthCheck()
process::io::read(s->err().get()))
.after(
timeout,
- [timeout, curlPid](Future<tuple<Future<Option<int>>,
- Future<string>,
- Future<string>>> future) {
+ [timeout, curlPid, _taskId](Future<tuple<Future<Option<int>>,
+ Future<string>,
+ Future<string>>> future) {
future.discard();
if (curlPid != -1) {
// Cleanup the HTTP_CHECK_COMMAND process.
- VLOG(1) << "Killing the HTTP health check process " << curlPid;
+ VLOG(1) << "Killing the HTTP health check process '" << curlPid
+ << "' for task '" << _taskId << "'";
os::killtree(curlPid, SIGKILL);
}
return Failure(
string(HTTP_CHECK_COMMAND) + " timed out after " +
- stringify(timeout) + "; aborting");
+ stringify(timeout));
})
.then(defer(self(), &Self::_httpHealthCheck, lambda::_1));
}
@@ -943,7 +949,8 @@ Future<Nothing> HealthCheckerProcess::tcpHealthCheck()
const HealthCheck::TCPCheckInfo& tcp = check.tcp();
- VLOG(1) << "Launching TCP health check at port '" << tcp.port() << "'";
+ VLOG(1) << "Launching TCP health check for task '" << taskId << "' at port"
+ << tcp.port();
const string tcpConnectPath = path::join(launcherDir, TCP_CHECK_COMMAND);
@@ -971,8 +978,11 @@ Future<Nothing> HealthCheckerProcess::tcpHealthCheck()
" subprocess: " + s.error());
}
+ // TODO(alexr): Use lambda named captures for
+ // these cached values once they are available.
pid_t tcpConnectPid = s->pid();
const Duration timeout = checkTimeout;
+ const TaskID _taskId = taskId;
return await(
s->status(),
@@ -980,21 +990,22 @@ Future<Nothing> HealthCheckerProcess::tcpHealthCheck()
process::io::read(s->err().get()))
.after(
timeout,
- [timeout, tcpConnectPid](Future<tuple<Future<Option<int>>,
- Future<string>,
- Future<string>>> future) {
+ [timeout, tcpConnectPid, _taskId](Future<tuple<Future<Option<int>>,
+ Future<string>,
+ Future<string>>> future)
+ {
future.discard();
if (tcpConnectPid != -1) {
// Cleanup the TCP_CHECK_COMMAND process.
- VLOG(1) << "Killing the TCP health check process " << tcpConnectPid;
+ VLOG(1) << "Killing the TCP health check process " << tcpConnectPid
+ << " for task '" << _taskId << "'";
os::killtree(tcpConnectPid, SIGKILL);
}
return Failure(
- string(TCP_CHECK_COMMAND) + " timed out after " +
- stringify(timeout) + "; aborting");
+ string(TCP_CHECK_COMMAND) + " timed out after " + stringify(timeout));
})
.then(defer(self(), &Self::_tcpHealthCheck, lambda::_1));
}
@@ -1041,7 +1052,8 @@ void HealthCheckerProcess::scheduleNext(const Duration& duration)
{
CHECK(!paused);
- VLOG(1) << "Scheduling health check in " << duration;
+ VLOG(1) << "Scheduling health check for task '" << taskId << "' in "
+ << duration;
delay(duration, self(), &Self::performSingleCheck);
}
@@ -1050,7 +1062,7 @@ void HealthCheckerProcess::scheduleNext(const Duration& duration)
void HealthCheckerProcess::pause()
{
if (!paused) {
- VLOG(1) << "Health checking paused";
+ VLOG(1) << "Health checking for task '" << taskId << "' paused";
paused = true;
}
@@ -1060,7 +1072,7 @@ void HealthCheckerProcess::pause()
void HealthCheckerProcess::resume()
{
if (paused) {
- VLOG(1) << "Health checking resumed";
+ VLOG(1) << "Health checking for task '" << taskId << "' resumed";
paused = false;
@@ -1081,7 +1093,7 @@ Option<Error> healthCheck(const HealthCheck& check)
switch (check.type()) {
case HealthCheck::COMMAND: {
if (!check.has_command()) {
- return Error("Expecting 'command' to be set for command health check");
+ return Error("Expecting 'command' to be set for COMMAND health check");
}
const CommandInfo& command = check.command();
http://git-wip-us.apache.org/repos/asf/mesos/blob/5c9ce378/src/tests/check_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/check_tests.cpp b/src/tests/check_tests.cpp
index 16f1c07..78ac498 100644
--- a/src/tests/check_tests.cpp
+++ b/src/tests/check_tests.cpp
@@ -1028,7 +1028,7 @@ TEST_F(CheckTest, CheckInfoValidation)
Option<Error> validate = validation::checkInfo(checkInfo);
EXPECT_SOME(validate);
EXPECT_EQ(
- "Expecting 'command' to be set for command check",
+ "Expecting 'command' to be set for COMMAND check",
validate->message);
checkInfo.set_type(CheckInfo::HTTP);
@@ -1090,7 +1090,7 @@ TEST_F(CheckTest, CheckInfoValidation)
validate = validation::checkInfo(checkInfo);
EXPECT_SOME(validate);
EXPECT_EQ(
- "The path 'healthz' of HTTP check must start with '/'",
+ "The path 'healthz' of HTTP check must start with '/'",
validate->message);
}
@@ -1160,7 +1160,7 @@ TEST_F(CheckTest, CheckStatusInfoValidation)
Option<Error> validate = validation::checkStatusInfo(checkStatusInfo);
EXPECT_SOME(validate);
EXPECT_EQ(
- "Expecting 'command' to be set for command check's status",
+ "Expecting 'command' to be set for COMMAND check's status",
validate->message);
checkStatusInfo.set_type(CheckInfo::HTTP);