You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/03/30 17:35:40 UTC
[3/4] mesos git commit: Added support for COMMAND checks to the
default executor.
Added support for COMMAND checks to the default executor.
Review: https://reviews.apache.org/r/58030/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3f81c6f6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3f81c6f6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3f81c6f6
Branch: refs/heads/master
Commit: 3f81c6f6052768e326e84e2eab93c20572b490ad
Parents: 3a689ab
Author: Gast�n Kleiman <ga...@mesosphere.io>
Authored: Thu Mar 30 17:14:14 2017 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Thu Mar 30 19:34:24 2017 +0200
----------------------------------------------------------------------
src/checks/checker.cpp | 454 ++++++++++++++++++--
src/checks/checker.hpp | 32 +-
src/launcher/default_executor.cpp | 8 +-
src/tests/check_tests.cpp | 733 ++++++++++++++++++++++++++++++++-
4 files changed, 1181 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/3f81c6f6/src/checks/checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.cpp b/src/checks/checker.cpp
index d1e9083..e48e037 100644
--- a/src/checks/checker.cpp
+++ b/src/checks/checker.cpp
@@ -19,6 +19,7 @@
#include <cstdint>
#include <iterator>
#include <map>
+#include <memory>
#include <string>
#include <tuple>
#include <vector>
@@ -28,6 +29,8 @@
#include <mesos/mesos.hpp>
#include <mesos/type_utils.hpp>
+#include <mesos/agent/agent.hpp>
+
#include <process/collect.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
@@ -49,14 +52,18 @@
#include <stout/strings.hpp>
#include <stout/try.hpp>
#include <stout/unreachable.hpp>
+#include <stout/uuid.hpp>
#include <stout/os/environment.hpp>
#include <stout/os/killtree.hpp>
+#include "common/http.hpp"
#include "common/protobuf_utils.hpp"
#include "common/status_utils.hpp"
#include "common/validation.hpp"
+#include "internal/evolve.hpp"
+
#ifdef __linux__
#include "linux/ns.hpp"
#endif
@@ -64,9 +71,14 @@
using process::Failure;
using process::Future;
using process::Owned;
+using process::Promise;
using process::Subprocess;
+using process::http::Connection;
+using process::http::Response;
+
using std::map;
+using std::shared_ptr;
using std::string;
using std::tuple;
using std::vector;
@@ -125,7 +137,10 @@ public:
const lambda::function<void(const CheckStatusInfo&)>& _callback,
const TaskID& _taskId,
const Option<pid_t>& _taskPid,
- const std::vector<std::string>& _namespaces);
+ const vector<string>& _namespaces,
+ const Option<ContainerID>& _taskContainerId,
+ const Option<process::http::URL>& _agentURL,
+ bool _commandCheckViaAgent);
void pause();
void resume();
@@ -141,9 +156,33 @@ private:
void scheduleNext(const Duration& duration);
void processCheckResult(
const Stopwatch& stopwatch,
- const CheckStatusInfo& result);
+ const Option<CheckStatusInfo>& result);
process::Future<int> commandCheck();
+
+ process::Future<int> nestedCommandCheck();
+ void _nestedCommandCheck(std::shared_ptr<process::Promise<int>> promise);
+ void __nestedCommandCheck(
+ std::shared_ptr<process::Promise<int>> promise,
+ process::http::Connection connection);
+ void ___nestedCommandCheck(
+ std::shared_ptr<process::Promise<int>> promise,
+ const ContainerID& checkContainerId,
+ const process::http::Response& launchResponse);
+
+ void nestedCommandCheckFailure(
+ std::shared_ptr<process::Promise<int>> promise,
+ process::http::Connection connection,
+ ContainerID checkContainerId,
+ std::shared_ptr<bool> checkTimedOut,
+ const std::string& failure);
+
+ process::Future<Option<int>> waitNestedContainer(
+ const ContainerID& containerId);
+ process::Future<Option<int>> _waitNestedContainer(
+ const ContainerID& containerId,
+ const process::http::Response& httpResponse);
+
void processCommandCheckResult(
const Stopwatch& stopwatch,
const process::Future<int>& result);
@@ -167,10 +206,18 @@ private:
const TaskID taskId;
const Option<pid_t> taskPid;
const std::vector<std::string> namespaces;
+ const Option<ContainerID> taskContainerId;
+ const Option<process::http::URL> agentURL;
+ const bool commandCheckViaAgent;
+
Option<lambda::function<pid_t(const lambda::function<int()>&)>> clone;
CheckStatusInfo previousCheckStatus;
bool paused;
+
+ // Contains the ID of the most recently terminated nested container
+ // that was used to perform a COMMAND check.
+ Option<ContainerID> previousCheckContainerId;
};
@@ -192,7 +239,37 @@ Try<Owned<Checker>> Checker::create(
callback,
taskId,
taskPid,
- namespaces));
+ namespaces,
+ None(),
+ None(),
+ false));
+
+ return Owned<Checker>(new Checker(process));
+}
+
+
+Try<process::Owned<Checker>> Checker::create(
+ const CheckInfo& check,
+ const lambda::function<void(const CheckStatusInfo&)>& callback,
+ const TaskID& taskId,
+ const ContainerID& taskContainerId,
+ const process::http::URL& agentURL)
+{
+ // Validate the `CheckInfo` protobuf.
+ Option<Error> error = validation::checkInfo(check);
+ if (error.isSome()) {
+ return error.get();
+ }
+
+ Owned<CheckerProcess> process(new CheckerProcess(
+ check,
+ callback,
+ taskId,
+ None(),
+ {},
+ taskContainerId,
+ agentURL,
+ true));
return Owned<Checker>(new Checker(process));
}
@@ -229,13 +306,19 @@ CheckerProcess::CheckerProcess(
const lambda::function<void(const CheckStatusInfo&)>& _callback,
const TaskID& _taskId,
const Option<pid_t>& _taskPid,
- const vector<string>& _namespaces)
+ const vector<string>& _namespaces,
+ const Option<ContainerID>& _taskContainerId,
+ const Option<process::http::URL>& _agentURL,
+ bool _commandCheckViaAgent)
: ProcessBase(process::ID::generate("checker")),
check(_check),
updateCallback(_callback),
taskId(_taskId),
taskPid(_taskPid),
namespaces(_namespaces),
+ taskContainerId(_taskContainerId),
+ agentURL(_agentURL),
+ commandCheckViaAgent(_commandCheckViaAgent),
paused(false)
{
Try<Duration> create = Duration::create(check.delay_seconds());
@@ -306,7 +389,9 @@ void CheckerProcess::performCheck()
switch (check.type()) {
case CheckInfo::COMMAND: {
- commandCheck().onAny(defer(
+ Future<int> future = commandCheckViaAgent ? nestedCommandCheck()
+ : commandCheck();
+ future.onAny(defer(
self(),
&Self::processCommandCheckResult, stopwatch, lambda::_1));
break;
@@ -361,7 +446,7 @@ void CheckerProcess::resume()
void CheckerProcess::processCheckResult(
const Stopwatch& stopwatch,
- const CheckStatusInfo& result)
+ const Option<CheckStatusInfo>& result)
{
// `Checker` might have been paused while performing the check.
if (paused) {
@@ -370,16 +455,20 @@ void CheckerProcess::processCheckResult(
return;
}
- VLOG(1) << "Performed " << check.type() << " check"
- << " for task '" << taskId << "' in " << stopwatch.elapsed();
-
- // Trigger the callback if check info changes.
- if (result != previousCheckStatus) {
- // We assume this is a local send, i.e., the checker library is not used
- // in a binary external to the executor and hence can not exit before
- // the data is sent to the executor.
- updateCallback(result);
- previousCheckStatus = result;
+ // `result` should be some if it was possible to perform the check,
+ // and empty if there was a transient error.
+ if (result.isSome()) {
+ VLOG(1) << "Performed " << check.type() << " check"
+ << " for task '" << taskId << "' in " << stopwatch.elapsed();
+
+ // Trigger the callback if check info changes.
+ if (result.get() != previousCheckStatus) {
+ // We assume this is a local send, i.e., the checker library is not used
+ // in a binary external to the executor and hence can not exit before
+ // the data is sent to the executor.
+ updateCallback(result.get());
+ previousCheckStatus = result.get();
+ }
}
scheduleNext(checkInterval);
@@ -470,37 +559,346 @@ Future<int> CheckerProcess::commandCheck()
}
+Future<int> CheckerProcess::nestedCommandCheck()
+{
+ CHECK_EQ(CheckInfo::COMMAND, check.type());
+ CHECK(check.has_command());
+ CHECK_SOME(taskContainerId);
+ CHECK_SOME(agentURL);
+
+ VLOG(1) << "Launching COMMAND check for task '" << taskId << "'";
+
+ // We don't want recoverable errors, e.g., the agent responding with
+ // HTTP status code 503, to trigger a check failure.
+ //
+ // The future returned by this method represents the result of a
+ // check. It will be set to the exit status of the check command if it
+ // succeeded, to a `Failure` if there was a non-transient error, and
+ // discarded if there was a transient error.
+ auto promise = std::make_shared<Promise<int>>();
+
+ if (previousCheckContainerId.isSome()) {
+ agent::Call call;
+ call.set_type(agent::Call::REMOVE_NESTED_CONTAINER);
+
+ agent::Call::RemoveNestedContainer* removeContainer =
+ call.mutable_remove_nested_container();
+
+ removeContainer->mutable_container_id()->CopyFrom(
+ previousCheckContainerId.get());
+
+ process::http::Request request;
+ request.method = "POST";
+ request.url = agentURL.get();
+ request.body = serialize(ContentType::PROTOBUF, evolve(call));
+ request.headers = {{"Accept", stringify(ContentType::PROTOBUF)},
+ {"Content-Type", stringify(ContentType::PROTOBUF)}};
+
+ process::http::request(request, false)
+ .onFailed(defer(self(),
+ [this, promise](const string& failure) {
+ LOG(WARNING) << "Connection to remove the nested container '"
+ << previousCheckContainerId.get()
+ << "' used for the COMMAND check for task '"
+ << taskId << "' failed: " << failure;
+
+ // Something went wrong while sending the request, we treat this
+ // as a transient failure and discard the promise.
+ promise->discard();
+ }))
+ .onReady(defer(self(), [this, promise](const Response& response) {
+ if (response.code != process::http::Status::OK) {
+ // The agent was unable to remove the check container, we
+ // treat this as a transient failure and discard the promise.
+ LOG(WARNING) << "Received '" << response.status << "' ("
+ << response.body << ") while removing the nested"
+ << " container '" << previousCheckContainerId.get()
+ << "' used for the COMMAND check for task '"
+ << taskId << "'";
+
+ promise->discard();
+ }
+
+ previousCheckContainerId = None();
+ _nestedCommandCheck(promise);
+ }));
+ } else {
+ _nestedCommandCheck(promise);
+ }
+
+ return promise->future();
+}
+
+
+void CheckerProcess::_nestedCommandCheck(
+ shared_ptr<process::Promise<int>> promise)
+{
+ // TODO(alexr): Use a lambda named capture for
+ // this cached value once it is available.
+ const TaskID _taskId = taskId;
+
+ process::http::connect(agentURL.get())
+ .onFailed(defer(self(), [_taskId, promise](const string& failure) {
+ LOG(WARNING) << "Unable to establish connection with the agent to launch"
+ << " COMMAND check for task '" << _taskId << "'"
+ << ": " << failure;
+
+ // We treat this as a transient failure.
+ promise->discard();
+ }))
+ .onReady(defer(self(), &Self::__nestedCommandCheck, promise, lambda::_1));
+}
+
+
+void CheckerProcess::__nestedCommandCheck(
+ shared_ptr<process::Promise<int>> promise,
+ Connection connection)
+{
+ ContainerID checkContainerId;
+ checkContainerId.set_value("check-" + UUID::random().toString());
+ checkContainerId.mutable_parent()->CopyFrom(taskContainerId.get());
+
+ previousCheckContainerId = checkContainerId;
+
+ CommandInfo command(check.command().command());
+
+ agent::Call call;
+ call.set_type(agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
+
+ agent::Call::LaunchNestedContainerSession* launch =
+ call.mutable_launch_nested_container_session();
+
+ launch->mutable_container_id()->CopyFrom(checkContainerId);
+ launch->mutable_command()->CopyFrom(command);
+
+ process::http::Request request;
+ request.method = "POST";
+ request.url = agentURL.get();
+ request.body = serialize(ContentType::PROTOBUF, evolve(call));
+ request.headers = {{"Accept", stringify(ContentType::RECORDIO)},
+ {"Message-Accept", stringify(ContentType::PROTOBUF)},
+ {"Content-Type", stringify(ContentType::PROTOBUF)}};
+
+ // TODO(alexr): Use a lambda named capture for
+ // this cached value once it is available.
+ const Duration timeout = checkTimeout;
+
+ auto checkTimedOut = std::make_shared<bool>(false);
+
+ // `LAUNCH_NESTED_CONTAINER_SESSION` returns a streamed response with
+ // the output of the container. The agent will close the stream once
+ // the container has exited, or kill the container if the client
+ // closes the connection.
+ //
+ // We're calling `Connection::send` with `streamed = false`, so that
+ // it returns an HTTP response of type 'BODY' once the entire response
+ // is received.
+ //
+ // This means that this future will not be completed until after the
+ // check command has finished or the connection has been closed.
+ connection.send(request, false)
+ .after(checkTimeout,
+ defer(self(), [timeout, checkTimedOut](Future<Response> future) {
+ future.discard();
+
+ *checkTimedOut = true;
+
+ return Failure("Command timed out after " + stringify(timeout));
+ }))
+ .onFailed(defer(self(),
+ &Self::nestedCommandCheckFailure,
+ promise,
+ connection,
+ checkContainerId,
+ checkTimedOut,
+ lambda::_1))
+ .onReady(defer(self(),
+ &Self::___nestedCommandCheck,
+ promise,
+ checkContainerId,
+ lambda::_1));
+}
+
+
+void CheckerProcess::___nestedCommandCheck(
+ shared_ptr<process::Promise<int>> promise,
+ const ContainerID& checkContainerId,
+ const Response& launchResponse)
+{
+ if (launchResponse.code != process::http::Status::OK) {
+ // The agent was unable to launch the check container,
+ // we treat this as a transient failure.
+ LOG(WARNING) << "Received '" << launchResponse.status << "' ("
+ << launchResponse.body << ") while launching COMMAND check"
+ << " for task '" << taskId << "'";
+
+ promise->discard();
+ return;
+ }
+
+ waitNestedContainer(checkContainerId)
+ .onFailed([promise](const string& failure) {
+ promise->fail(
+ "Unable to get the exit code: " + failure);
+ })
+ .onReady([promise](const Option<int>& status) -> void {
+ if (status.isNone()) {
+ promise->fail("Unable to get the exit code");
+ // TODO(gkleiman): Make sure that the following block works on Windows.
+ } else if (WIFSIGNALED(status.get()) &&
+ WTERMSIG(status.get()) == SIGKILL) {
+ // The check container was signaled, probably because the task
+ // finished while the check was still in-flight, so we discard
+ // the result.
+ promise->discard();
+ } else {
+ promise->set(status.get());
+ }
+ });
+}
+
+
+void CheckerProcess::nestedCommandCheckFailure(
+ shared_ptr<Promise<int>> promise,
+ Connection connection,
+ ContainerID checkContainerId,
+ shared_ptr<bool> checkTimedOut,
+ const string& failure)
+{
+ if (*checkTimedOut) {
+ // The check timed out, closing the connection will make the agent
+ // kill the container.
+ connection.disconnect();
+
+ // If the check delay interval is zero, we'll try to perform another
+ // check right after we finish processing the current timeout.
+ //
+ // We'll try to remove the container created for the check at the
+ // beginning of the next check. In order to prevent a failure, the
+ // promise should only be completed once we're sure that the
+ // container has terminated.
+ waitNestedContainer(checkContainerId)
+ .onAny([failure, promise](const Future<Option<int>>&) {
+ // We assume that once `WaitNestedContainer` returns,
+ // irrespective of whether the response contains a failure, the
+ // container will be in a terminal state, and that it will be
+ // possible to remove it.
+ //
+ // This means that we don't need to retry the
+ // `WaitNestedContainer` call.
+ promise->fail(failure);
+ });
+ } else {
+ // The agent was not able to complete the request, discarding the
+ // promise signals the checker that it should retry the check.
+ //
+ // This will allow us to recover from a blip. The executor will
+ // pause the checker when it detects that the agent is not
+ // available.
+ LOG(WARNING) << "Connection to the agent to launch COMMAND check"
+ << " for task '" << taskId << "' failed: " << failure;
+
+ promise->discard();
+ }
+}
+
+
+Future<Option<int>> CheckerProcess::waitNestedContainer(
+ const ContainerID& containerId)
+{
+ agent::Call call;
+ call.set_type(agent::Call::WAIT_NESTED_CONTAINER);
+
+ agent::Call::WaitNestedContainer* containerWait =
+ call.mutable_wait_nested_container();
+
+ containerWait->mutable_container_id()->CopyFrom(containerId);
+
+ process::http::Request request;
+ request.method = "POST";
+ request.url = agentURL.get();
+ request.body = serialize(ContentType::PROTOBUF, evolve(call));
+ request.headers = {{"Accept", stringify(ContentType::PROTOBUF)},
+ {"Content-Type", stringify(ContentType::PROTOBUF)}};
+
+ return process::http::request(request, false)
+ .repair([containerId](const Future<Response>& future) {
+ return Failure(
+ "Connection to wait for check container '" +
+ stringify(containerId) + "' failed: " + future.failure());
+ })
+ .then(defer(self(),
+ &Self::_waitNestedContainer, containerId, lambda::_1));
+}
+
+
+Future<Option<int>> CheckerProcess::_waitNestedContainer(
+ const ContainerID& containerId,
+ const Response& httpResponse)
+{
+ if (httpResponse.code != process::http::Status::OK) {
+ return Failure(
+ "Received '" + httpResponse.status + "' (" + httpResponse.body +
+ ") while waiting on check container '" + stringify(containerId) + "'");
+ }
+
+ Try<agent::Response> response =
+ deserialize<agent::Response>(ContentType::PROTOBUF, httpResponse.body);
+ CHECK_SOME(response);
+
+ CHECK(response->has_wait_nested_container());
+
+ return (
+ response->wait_nested_container().has_exit_status()
+ ? Option<int>(response->wait_nested_container().exit_status())
+ : Option<int>::none());
+}
+
+
void CheckerProcess::processCommandCheckResult(
const Stopwatch& stopwatch,
- const Future<int>& result)
+ const Future<int>& future)
{
- CheckStatusInfo checkStatusInfo;
- checkStatusInfo.set_type(check.type());
+ Option<CheckStatusInfo> result;
- // On Posix, `result` corresponds to termination information in the
+ // On Posix, `future` corresponds to termination information in the
// `stat_loc` area. On Windows, `status` is obtained via calling the
// `GetExitCodeProcess()` function.
//
// TODO(alexr): Ensure `WEXITSTATUS` family macros are no-op on Windows,
// see MESOS-7242.
- if (result.isReady() && WIFEXITED(result.get())) {
- const int exitCode = WEXITSTATUS(result.get());
- VLOG(1) << check.type() << " check for task '"
- << taskId << "' returned: " << exitCode;
+ if (future.isReady() && WIFEXITED(future.get())) {
+ const int exitCode = WEXITSTATUS(future.get());
+ VLOG(1) << check.type() << " check for task '" << taskId << "'"
+ << " returned: " << exitCode;
+ CheckStatusInfo checkStatusInfo;
+ checkStatusInfo.set_type(check.type());
checkStatusInfo.mutable_command()->set_exit_code(
static_cast<int32_t>(exitCode));
+
+ result = checkStatusInfo;
+ } else if (future.isDiscarded()) {
+ // Check's status is currently not available due to a transient error,
+ // e.g., due to the agent failover, no `CheckStatusInfo` message should
+ // be sent to the callback.
+ LOG(INFO) << check.type() << " check for task '" << taskId << "' discarded";
+
+ result = None();
} else {
// Check's status is currently not available, which may indicate a change
// that should be reported as an empty `CheckStatusInfo.Command` message.
- LOG(WARNING) << check.type() << " check for task '" << taskId
- << "' failed: "
- << (result.isFailed() ? result.failure() : "discarded");
+ LOG(WARNING) << check.type() << " check for task '" << taskId << "'"
+ << " failed: " << future.failure();
+ CheckStatusInfo checkStatusInfo;
+ checkStatusInfo.set_type(check.type());
checkStatusInfo.mutable_command();
+
+ result = checkStatusInfo;
}
- processCheckResult(stopwatch, checkStatusInfo);
+ processCheckResult(stopwatch, result);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/3f81c6f6/src/checks/checker.hpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.hpp b/src/checks/checker.hpp
index 1521b9c..fb939d8 100644
--- a/src/checks/checker.hpp
+++ b/src/checks/checker.hpp
@@ -22,6 +22,7 @@
#include <mesos/mesos.hpp>
+#include <process/http.hpp>
#include <process/owned.hpp>
#include <stout/error.hpp>
@@ -43,6 +44,9 @@ public:
* Attempts to create a `Checker` object. In case of success, checking
* starts immediately after initialization.
*
+ * If the check is a COMMAND check, the checker will fork a process, enter
+ * the task's namespaces, and execute the commmand.
+ *
* @param check The protobuf message definition of a check.
* @param callback A callback `Checker` uses to send check status updates
* to its owner (usually an executor).
@@ -56,12 +60,38 @@ public:
* `process::Stream<CheckStatusInfo>` rather than invoking a callback.
*/
static Try<process::Owned<Checker>> create(
- const CheckInfo& checkInfo,
+ const CheckInfo& check,
const lambda::function<void(const CheckStatusInfo&)>& callback,
const TaskID& taskId,
const Option<pid_t>& taskPid,
const std::vector<std::string>& namespaces);
+ /**
+ * Attempts to create a `Checker` object. In case of success, checking
+ * starts immediately after initialization.
+ *
+ * If the check is a COMMAND check, the checker will delegate the execution
+ * of the check to the Mesos agent via the `LaunchNestedContainerSession`
+ * API call.
+ *
+ * @param check The protobuf message definition of a check.
+ * @param callback A callback `Checker` uses to send check status updates
+ * to its owner (usually an executor).
+ * @param taskId The TaskID of the target task.
+ * @param taskContainerId The ContainerID of the target task.
+ * @param agentURL The URL of the agent.
+ * @return A `Checker` object or an error if `create` fails.
+ *
+ * @todo A better approach would be to return a stream of updates, e.g.,
+ * `process::Stream<CheckStatusInfo>` rather than invoking a callback.
+ */
+ static Try<process::Owned<Checker>> create(
+ const CheckInfo& check,
+ const lambda::function<void(const CheckStatusInfo&)>& callback,
+ const TaskID& taskId,
+ const ContainerID& taskContainerId,
+ const process::http::URL& agentURL);
+
~Checker();
// Not copyable, not assignable.
http://git-wip-us.apache.org/repos/asf/mesos/blob/3f81c6f6/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index 79785fc..9cc40c6 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -503,17 +503,13 @@ protected:
false});
if (task.has_check()) {
- // TODO(alexr): Add support for command checks.
- CHECK_NE(CheckInfo::COMMAND, task.check().type())
- << "Command checks are not supported yet";
-
Try<Owned<checks::Checker>> checker =
checks::Checker::create(
task.check(),
defer(self(), &Self::taskCheckUpdated, taskId, lambda::_1),
taskId,
- None(),
- vector<string>());
+ containerId,
+ agent);
if (checker.isError()) {
// TODO(anand): Should we send a TASK_FAILED instead?
http://git-wip-us.apache.org/repos/asf/mesos/blob/3f81c6f6/src/tests/check_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/check_tests.cpp b/src/tests/check_tests.cpp
index 78ac498..d7fcbf9 100644
--- a/src/tests/check_tests.cpp
+++ b/src/tests/check_tests.cpp
@@ -33,11 +33,16 @@
#include "checks/checker.hpp"
+#include "slave/containerizer/fetcher.hpp"
+
#include "tests/flags.hpp"
#include "tests/health_check_test_helper.hpp"
#include "tests/mesos.hpp"
#include "tests/utils.hpp"
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::MesosContainerizer;
+
using mesos::master::detector::MasterDetector;
using mesos::v1::scheduler::Call;
@@ -211,6 +216,17 @@ public:
mesos->send(call);
}
+
+ virtual void teardown(
+ Mesos* mesos,
+ const v1::FrameworkID& frameworkId)
+ {
+ Call call;
+ call.set_type(Call::TEARDOWN);
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+
+ mesos->send(call);
+ }
};
@@ -661,10 +677,14 @@ TEST_F(CommandExecutorCheckTest, CommandCheckAndHealthCheckNoShadowing)
v1::TaskInfo taskInfo =
v1::createTask(agentId, resources, SLEEP_COMMAND(10000));
+ // Set both check and health check interval to an increased value to
+ // prevent a second update coming before reconciliation response.
+ int interval = 10;
+
v1::CheckInfo* checkInfo = taskInfo.mutable_check();
checkInfo->set_type(v1::CheckInfo::COMMAND);
checkInfo->set_delay_seconds(0);
- checkInfo->set_interval_seconds(0);
+ checkInfo->set_interval_seconds(interval);
checkInfo->mutable_command()->mutable_command()->set_value("exit 1");
// Delay health check for 1s to ensure health update comes after check update.
@@ -676,7 +696,7 @@ TEST_F(CommandExecutorCheckTest, CommandCheckAndHealthCheckNoShadowing)
v1::HealthCheck* healthCheckInfo = taskInfo.mutable_health_check();
healthCheckInfo->set_type(v1::HealthCheck::COMMAND);
healthCheckInfo->set_delay_seconds(1);
- healthCheckInfo->set_interval_seconds(0);
+ healthCheckInfo->set_interval_seconds(interval);
healthCheckInfo->mutable_command()->set_value("exit 0");
launchTask(&mesos, offer, taskInfo);
@@ -860,16 +880,707 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(CommandExecutorCheckTest, HTTPCheckDelivered)
// These are check tests with the default executor.
class DefaultExecutorCheckTest : public CheckTest {};
-// TODO(alexr): Implement following tests once the default executor supports
-// command checks.
+
+// Verifies that a command check is supported by the default executor,
+// its status is delivered in a task status update, and the last known
+// status can be obtained during explicit and implicit reconciliation.
+// Additionally ensures that the specified environment of the command
+// check is honored.
//
-// 1. COMMAND check with env var works, is delivered, and is reconciled
-// properly.
-// 2. COMMAND check's status change is delivered. TODO(alexr): When check
-// mocking is available, ensure only status changes are delivered.
-// 3. COMMAND check times out.
-// 4. COMMAND check and health check do not shadow each other; upon
-// reconciliation both statuses are available.
+// TODO(gkleiman): Check if this test works on Windows.
+TEST_F_TEMP_DISABLED_ON_WINDOWS(
+ DefaultExecutorCheckTest,
+ CommandCheckDeliveredAndReconciled)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Disable AuthN on the agent.
+ slave::Flags flags = CreateSlaveFlags();
+ flags.authenticate_http_readwrite = false;
+
+ Fetcher fetcher;
+
+ // We have to explicitly create a `Containerizer` in non-local mode,
+ // because `LaunchNestedContainerSession` (used by command checks)
+ // tries to start a IO switchboard, which doesn't work in local mode yet.
+ Try<MesosContainerizer*> _containerizer =
+ MesosContainerizer::create(flags, false, &fetcher);
+
+ ASSERT_SOME(_containerizer);
+
+ Owned<slave::Containerizer> containerizer(_containerizer.get());
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> agent =
+ StartSlave(detector.get(), containerizer.get(), flags);
+ ASSERT_SOME(agent);
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+
+ const v1::Resources resources =
+ v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+ v1::ExecutorInfo executorInfo;
+ executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
+ executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
+ executorInfo.mutable_resources()->CopyFrom(resources);
+ executorInfo.mutable_shutdown_grace_period()->set_nanoseconds(
+ Seconds(10).ns());
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033.
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ subscribe(&mesos, frameworkInfo);
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ // Update `executorInfo` with the subscribed `frameworkId`.
+ executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0, offers->offers().size());
+ const v1::Offer& offer = offers->offers(0);
+ const v1::AgentID agentId = offer.agent_id();
+
+ Future<Event::Update> updateTaskRunning;
+ Future<Event::Update> updateCheckResult;
+ Future<Event::Update> updateExplicitReconciliation;
+ Future<Event::Update> updateImplicitReconciliation;
+
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskRunning))
+ .WillOnce(FutureArg<1>(&updateCheckResult))
+ .WillOnce(FutureArg<1>(&updateExplicitReconciliation))
+ .WillOnce(FutureArg<1>(&updateImplicitReconciliation))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ v1::TaskInfo taskInfo =
+ v1::createTask(agentId, resources, SLEEP_COMMAND(10000));
+
+ v1::CheckInfo* checkInfo = taskInfo.mutable_check();
+ checkInfo->set_type(v1::CheckInfo::COMMAND);
+ checkInfo->set_delay_seconds(0);
+ checkInfo->set_interval_seconds(0);
+
+ v1::CommandInfo* checkCommand =
+ checkInfo->mutable_command()->mutable_command();
+ checkCommand->set_value("exit $STATUS");
+
+ v1::Environment::Variable* variable =
+ checkCommand->mutable_environment()->add_variables();
+ variable->set_name("STATUS");
+ variable->set_value("1");
+
+ v1::TaskGroupInfo taskGroup;
+ taskGroup.add_tasks()->CopyFrom(taskInfo);
+
+ launchTaskGroup(&mesos, offer, executorInfo, taskGroup);
+
+ AWAIT_READY(updateTaskRunning);
+ const v1::TaskStatus& taskRunning = updateTaskRunning->status();
+
+ ASSERT_EQ(TASK_RUNNING, taskRunning.state());
+ EXPECT_EQ(taskInfo.task_id(), taskRunning.task_id());
+ EXPECT_TRUE(taskRunning.has_check_status());
+ EXPECT_TRUE(taskRunning.check_status().has_command());
+ EXPECT_FALSE(taskRunning.check_status().command().has_exit_code());
+
+ acknowledge(&mesos, frameworkId, taskRunning);
+
+ AWAIT_READY(updateCheckResult);
+ const v1::TaskStatus& checkResult = updateCheckResult->status();
+
+ ASSERT_EQ(TASK_RUNNING, checkResult.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+ checkResult.reason());
+ EXPECT_EQ(taskInfo.task_id(), checkResult.task_id());
+ EXPECT_TRUE(checkResult.has_check_status());
+ EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
+ EXPECT_EQ(1, checkResult.check_status().command().exit_code());
+
+ acknowledge(&mesos, frameworkId, checkResult);
+
+ // Trigger explicit reconciliation.
+ reconcile(
+ &mesos,
+ frameworkId,
+ {std::make_pair(checkResult.task_id(), checkResult.agent_id())});
+
+ AWAIT_READY(updateExplicitReconciliation);
+ const v1::TaskStatus& explicitReconciliation =
+ updateExplicitReconciliation->status();
+
+ ASSERT_EQ(TASK_RUNNING, explicitReconciliation.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_RECONCILIATION,
+ explicitReconciliation.reason());
+ EXPECT_EQ(taskInfo.task_id(), explicitReconciliation.task_id());
+ EXPECT_TRUE(explicitReconciliation.has_check_status());
+ EXPECT_TRUE(explicitReconciliation.check_status().command().has_exit_code());
+ EXPECT_EQ(1, explicitReconciliation.check_status().command().exit_code());
+
+ acknowledge(&mesos, frameworkId, explicitReconciliation);
+
+ // Trigger implicit reconciliation.
+ reconcile(&mesos, frameworkId, {});
+
+ AWAIT_READY(updateImplicitReconciliation);
+ const v1::TaskStatus& implicitReconciliation =
+ updateImplicitReconciliation->status();
+
+ ASSERT_EQ(TASK_RUNNING, implicitReconciliation.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_RECONCILIATION,
+ implicitReconciliation.reason());
+ EXPECT_EQ(taskInfo.task_id(), implicitReconciliation.task_id());
+ EXPECT_TRUE(implicitReconciliation.has_check_status());
+ EXPECT_TRUE(implicitReconciliation.check_status().command().has_exit_code());
+ EXPECT_EQ(1, implicitReconciliation.check_status().command().exit_code());
+
+ // Cleanup all mesos launched containers.
+ Future<hashset<ContainerID>> containerIds = containerizer->containers();
+ AWAIT_READY(containerIds);
+
+ EXPECT_CALL(*scheduler, disconnected(_));
+
+ teardown(&mesos, frameworkId);
+
+ foreach (const ContainerID& containerId, containerIds.get()) {
+ AWAIT_READY(containerizer->wait(containerId));
+ }
+}
+
+
+// Verifies that a command check's status changes are delivered.
+//
+// TODO(alexr): When check mocking is available, ensure that *only*
+// status changes are delivered.
+//
+// TODO(gkleiman): Check if this test works on Windows.
+TEST_F_TEMP_DISABLED_ON_WINDOWS(
+ DefaultExecutorCheckTest,
+ CommandCheckStatusChange)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Disable AuthN on the agent.
+ slave::Flags flags = CreateSlaveFlags();
+ flags.authenticate_http_readwrite = false;
+
+ Fetcher fetcher;
+
+ // We have to explicitly create a `Containerizer` in non-local mode,
+ // because `LaunchNestedContainerSession` (used by command checks)
+ // tries to start a IO switchboard, which doesn't work in local mode yet.
+ Try<MesosContainerizer*> _containerizer =
+ MesosContainerizer::create(flags, false, &fetcher);
+
+ ASSERT_SOME(_containerizer);
+
+ Owned<slave::Containerizer> containerizer(_containerizer.get());
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> agent =
+ StartSlave(detector.get(), containerizer.get(), flags);
+ ASSERT_SOME(agent);
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+
+ const v1::Resources resources =
+ v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+ v1::ExecutorInfo executorInfo;
+ executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
+ executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
+ executorInfo.mutable_resources()->CopyFrom(resources);
+ executorInfo.mutable_shutdown_grace_period()->set_nanoseconds(
+ Seconds(10).ns());
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033.
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ subscribe(&mesos, frameworkInfo);
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ // Update `executorInfo` with the subscribed `frameworkId`.
+ executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0, offers->offers().size());
+ const v1::Offer& offer = offers->offers(0);
+ const v1::AgentID agentId = offer.agent_id();
+
+ Future<Event::Update> updateTaskRunning;
+ Future<Event::Update> updateCheckResult;
+ Future<Event::Update> updateCheckResultChanged;
+ Future<Event::Update> updateCheckResultBack;
+
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskRunning))
+ .WillOnce(FutureArg<1>(&updateCheckResult))
+ .WillOnce(FutureArg<1>(&updateCheckResultChanged))
+ .WillOnce(FutureArg<1>(&updateCheckResultBack))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ v1::TaskInfo taskInfo =
+ v1::createTask(agentId, resources, SLEEP_COMMAND(10000));
+
+ v1::CheckInfo* checkInfo = taskInfo.mutable_check();
+ checkInfo->set_type(v1::CheckInfo::COMMAND);
+ checkInfo->set_delay_seconds(0);
+ checkInfo->set_interval_seconds(0);
+ checkInfo->mutable_command()->mutable_command()->set_value(
+ FLAPPING_CHECK_COMMAND(path::join(os::getcwd(), "XXXXXX")));
+
+ v1::TaskGroupInfo taskGroup;
+ taskGroup.add_tasks()->CopyFrom(taskInfo);
+
+ launchTaskGroup(&mesos, offer, executorInfo, taskGroup);
+
+ AWAIT_READY(updateTaskRunning);
+ ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
+ EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());
+
+ acknowledge(&mesos, frameworkId, updateTaskRunning->status());
+
+ AWAIT_READY(updateCheckResult);
+ const v1::TaskStatus& checkResult = updateCheckResult->status();
+
+ ASSERT_EQ(TASK_RUNNING, checkResult.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+ checkResult.reason());
+ EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
+ EXPECT_EQ(1, checkResult.check_status().command().exit_code());
+
+ acknowledge(&mesos, frameworkId, checkResult);
+
+ AWAIT_READY(updateCheckResultChanged);
+ const v1::TaskStatus& checkResultChanged = updateCheckResultChanged->status();
+
+ ASSERT_EQ(TASK_RUNNING, checkResultChanged.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+ checkResultChanged.reason());
+ EXPECT_TRUE(checkResultChanged.check_status().command().has_exit_code());
+ EXPECT_EQ(0, checkResultChanged.check_status().command().exit_code());
+
+ acknowledge(&mesos, frameworkId, checkResultChanged);
+
+ AWAIT_READY(updateCheckResultBack);
+ const v1::TaskStatus& checkResultBack = updateCheckResultBack->status();
+
+ ASSERT_EQ(TASK_RUNNING, checkResultBack.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+ checkResultBack.reason());
+ EXPECT_TRUE(checkResultBack.check_status().command().has_exit_code());
+ EXPECT_EQ(1, checkResultBack.check_status().command().exit_code());
+
+ // Cleanup all mesos launched containers.
+ Future<hashset<ContainerID>> containerIds = containerizer->containers();
+ AWAIT_READY(containerIds);
+
+ EXPECT_CALL(*scheduler, disconnected(_));
+
+ teardown(&mesos, frameworkId);
+
+ foreach (const ContainerID& containerId, containerIds.get()) {
+ AWAIT_READY(containerizer->wait(containerId));
+ }
+}
+
+
+// Verifies that when a command check times out after a successful check,
+// an empty check status update is delivered.
+//
+// TODO(gkleiman): Check if this test works on Windows.
+TEST_F_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorCheckTest, CommandCheckTimeout)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Disable AuthN on the agent.
+ slave::Flags flags = CreateSlaveFlags();
+ flags.authenticate_http_readwrite = false;
+
+ Fetcher fetcher;
+
+ // We have to explicitly create a `Containerizer` in non-local mode,
+ // because `LaunchNestedContainerSession` (used by command checks)
+ // tries to start a IO switchboard, which doesn't work in local mode yet.
+ Try<MesosContainerizer*> _containerizer =
+ MesosContainerizer::create(flags, false, &fetcher);
+
+ ASSERT_SOME(_containerizer);
+
+ Owned<slave::Containerizer> containerizer(_containerizer.get());
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> agent =
+ StartSlave(detector.get(), containerizer.get(), flags);
+ ASSERT_SOME(agent);
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+
+ const v1::Resources resources =
+ v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+ v1::ExecutorInfo executorInfo;
+ executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
+ executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
+ executorInfo.mutable_resources()->CopyFrom(resources);
+ executorInfo.mutable_shutdown_grace_period()->set_nanoseconds(
+ Seconds(10).ns());
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033.
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ subscribe(&mesos, frameworkInfo);
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ // Update `executorInfo` with the subscribed `frameworkId`.
+ executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0, offers->offers().size());
+ const v1::Offer& offer = offers->offers(0);
+ const v1::AgentID agentId = offer.agent_id();
+
+ Future<Event::Update> updateTaskRunning;
+ Future<Event::Update> updateCheckResult;
+ Future<Event::Update> updateCheckResultTimeout;
+
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskRunning))
+ .WillOnce(FutureArg<1>(&updateCheckResult))
+ .WillOnce(FutureArg<1>(&updateCheckResultTimeout))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ v1::TaskInfo taskInfo =
+ v1::createTask(agentId, resources, SLEEP_COMMAND(10000));
+
+ v1::CheckInfo* checkInfo = taskInfo.mutable_check();
+ checkInfo->set_type(v1::CheckInfo::COMMAND);
+ checkInfo->set_delay_seconds(0);
+ checkInfo->set_interval_seconds(0);
+ checkInfo->set_timeout_seconds(1);
+ checkInfo->mutable_command()->mutable_command()->set_value(
+ STALLING_CHECK_COMMAND(path::join(os::getcwd(), "XXXXXX")));
+
+ v1::TaskGroupInfo taskGroup;
+ taskGroup.add_tasks()->CopyFrom(taskInfo);
+
+ launchTaskGroup(&mesos, offer, executorInfo, taskGroup);
+
+ AWAIT_READY(updateTaskRunning);
+ ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
+ EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());
+
+ acknowledge(&mesos, frameworkId, updateTaskRunning->status());
+
+ AWAIT_READY(updateCheckResult);
+ const v1::TaskStatus& checkResult = updateCheckResult->status();
+
+ ASSERT_EQ(TASK_RUNNING, checkResult.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+ checkResult.reason());
+ EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
+ EXPECT_EQ(1, checkResult.check_status().command().exit_code());
+
+ acknowledge(&mesos, frameworkId, checkResult);
+
+ AWAIT_READY(updateCheckResultTimeout);
+ const v1::TaskStatus& checkResultTimeout = updateCheckResultTimeout->status();
+
+ ASSERT_EQ(TASK_RUNNING, checkResultTimeout.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+ checkResultTimeout.reason());
+ EXPECT_FALSE(checkResultTimeout.check_status().command().has_exit_code());
+
+ // Cleanup all mesos launched containers.
+ Future<hashset<ContainerID>> containerIds = containerizer->containers();
+ AWAIT_READY(containerIds);
+
+ EXPECT_CALL(*scheduler, disconnected(_));
+
+ teardown(&mesos, frameworkId);
+
+ foreach (const ContainerID& containerId, containerIds.get()) {
+ AWAIT_READY(containerizer->wait(containerId));
+ }
+}
+
+
+// Verifies that when both command check and health check are specified,
+// health and check updates include both statuses. Also verifies that
+// both statuses are included upon reconciliation.
+//
+// TODO(gkleiman): Check if this test works on Windows.
+TEST_F(DefaultExecutorCheckTest, CommandCheckAndHealthCheckNoShadowing)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Disable AuthN on the agent.
+ slave::Flags flags = CreateSlaveFlags();
+ flags.authenticate_http_readwrite = false;
+
+ Fetcher fetcher;
+
+ // We have to explicitly create a `Containerizer` in non-local mode,
+ // because `LaunchNestedContainerSession` (used by command checks)
+ // tries to start a IO switchboard, which doesn't work in local mode yet.
+ Try<MesosContainerizer*> _containerizer =
+ MesosContainerizer::create(flags, false, &fetcher);
+
+ ASSERT_SOME(_containerizer);
+
+ Owned<slave::Containerizer> containerizer(_containerizer.get());
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> agent =
+ StartSlave(detector.get(), containerizer.get(), flags);
+ ASSERT_SOME(agent);
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+
+ const v1::Resources resources =
+ v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+ v1::ExecutorInfo executorInfo;
+ executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
+ executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
+ executorInfo.mutable_resources()->CopyFrom(resources);
+ executorInfo.mutable_shutdown_grace_period()->set_nanoseconds(
+ Seconds(10).ns());
+
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ Future<Nothing> connected;
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(FutureSatisfy(&connected))
+ .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033.
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid,
+ ContentType::PROTOBUF,
+ scheduler);
+
+ AWAIT_READY(connected);
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ subscribe(&mesos, frameworkInfo);
+
+ AWAIT_READY(subscribed);
+
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ // Update `executorInfo` with the subscribed `frameworkId`.
+ executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0, offers->offers().size());
+ const v1::Offer& offer = offers->offers(0);
+ const v1::AgentID agentId = offer.agent_id();
+
+ Future<Event::Update> updateTaskRunning;
+ Future<Event::Update> updateCheckResult;
+ Future<Event::Update> updateHealthResult;
+ Future<Event::Update> updateImplicitReconciliation;
+
+ EXPECT_CALL(*scheduler, update(_, _))
+ .WillOnce(FutureArg<1>(&updateTaskRunning))
+ .WillOnce(FutureArg<1>(&updateCheckResult))
+ .WillOnce(FutureArg<1>(&updateHealthResult))
+ .WillOnce(FutureArg<1>(&updateImplicitReconciliation))
+ .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+ v1::TaskInfo taskInfo =
+ v1::createTask(agentId, resources, SLEEP_COMMAND(10000));
+
+ // Set both check and health check interval to an increased value to
+ // prevent a second update coming before reconciliation response.
+ int interval = 10;
+
+ v1::CheckInfo* checkInfo = taskInfo.mutable_check();
+ checkInfo->set_type(v1::CheckInfo::COMMAND);
+ checkInfo->set_delay_seconds(0);
+ checkInfo->set_interval_seconds(interval);
+ checkInfo->mutable_command()->mutable_command()->set_value("exit 1");
+
+ // Delay health check for 1s to ensure health update comes after check update.
+ //
+ // TODO(alexr): This can lead to flakiness on busy agents. A more robust
+ // approach could be setting the grace period to MAX_INT, and make the
+ // health check pass iff a file created by the check exists. Alternatively,
+ // we can relax the expectation that the check update is delivered first.
+ v1::HealthCheck* healthCheckInfo = taskInfo.mutable_health_check();
+ healthCheckInfo->set_type(v1::HealthCheck::COMMAND);
+ healthCheckInfo->set_delay_seconds(1);
+ healthCheckInfo->set_interval_seconds(interval);
+ healthCheckInfo->mutable_command()->set_value("exit 0");
+
+ launchTask(&mesos, offer, taskInfo);
+
+ AWAIT_READY(updateTaskRunning);
+ ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
+ EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());
+
+ acknowledge(&mesos, frameworkId, updateTaskRunning->status());
+
+ AWAIT_READY(updateCheckResult);
+ const v1::TaskStatus& checkResult = updateCheckResult->status();
+
+ ASSERT_EQ(TASK_RUNNING, checkResult.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+ checkResult.reason());
+ EXPECT_EQ(taskInfo.task_id(), checkResult.task_id());
+ EXPECT_FALSE(checkResult.has_healthy());
+ EXPECT_TRUE(checkResult.has_check_status());
+ EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
+ EXPECT_EQ(1, checkResult.check_status().command().exit_code());
+
+ acknowledge(&mesos, frameworkId, checkResult);
+
+ AWAIT_READY(updateHealthResult);
+ const v1::TaskStatus& healthResult = updateHealthResult->status();
+
+ ASSERT_EQ(TASK_RUNNING, healthResult.state());
+ EXPECT_EQ(taskInfo.task_id(), healthResult.task_id());
+ EXPECT_TRUE(healthResult.has_healthy());
+ EXPECT_TRUE(healthResult.healthy());
+ EXPECT_TRUE(healthResult.has_check_status());
+ EXPECT_TRUE(healthResult.check_status().command().has_exit_code());
+ EXPECT_EQ(1, healthResult.check_status().command().exit_code());
+
+ acknowledge(&mesos, frameworkId, healthResult);
+
+ // Trigger implicit reconciliation.
+ reconcile(&mesos, frameworkId, {});
+
+ AWAIT_READY(updateImplicitReconciliation);
+ const v1::TaskStatus& implicitReconciliation =
+ updateImplicitReconciliation->status();
+
+ ASSERT_EQ(TASK_RUNNING, implicitReconciliation.state());
+ ASSERT_EQ(
+ v1::TaskStatus::REASON_RECONCILIATION,
+ implicitReconciliation.reason());
+ EXPECT_EQ(taskInfo.task_id(), implicitReconciliation.task_id());
+ EXPECT_TRUE(implicitReconciliation.has_healthy());
+ EXPECT_TRUE(implicitReconciliation.healthy());
+ EXPECT_TRUE(implicitReconciliation.has_check_status());
+ EXPECT_TRUE(implicitReconciliation.check_status().command().has_exit_code());
+ EXPECT_EQ(1, implicitReconciliation.check_status().command().exit_code());
+
+ // Cleanup all mesos launched containers.
+ Future<hashset<ContainerID>> containerIds = containerizer->containers();
+ AWAIT_READY(containerIds);
+
+ EXPECT_CALL(*scheduler, disconnected(_));
+
+ teardown(&mesos, frameworkId);
+
+ foreach (const ContainerID& containerId, containerIds.get()) {
+ AWAIT_READY(containerizer->wait(containerId));
+ }
+}
+
// Verifies that an HTTP check is supported by the default executor and
// its status is delivered in a task status update.