You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/03/24 04:36:09 UTC
[3/6] mesos git commit: Added support for COMMAND health checks to
default executor.
Added support for COMMAND health checks to default executor.
Review: https://reviews.apache.org/r/55901/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4bbfaebb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4bbfaebb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4bbfaebb
Branch: refs/heads/master
Commit: 4bbfaebb793b3f08e9acf1f358e881da02b4a068
Parents: b97d682
Author: Gast�n Kleiman <ga...@mesosphere.io>
Authored: Fri Mar 24 00:48:50 2017 +0100
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Fri Mar 24 00:56:44 2017 +0100
----------------------------------------------------------------------
src/checks/health_checker.cpp | 256 ++++++++++++++++++++++++++++++++-
src/checks/health_checker.hpp | 74 +++++++++-
src/launcher/default_executor.cpp | 12 +-
src/tests/health_check_tests.cpp | 113 +++++++++++++++
4 files changed, 445 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4bbfaebb/src/checks/health_checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/health_checker.cpp b/src/checks/health_checker.cpp
index 236d07b..3a7de78 100644
--- a/src/checks/health_checker.cpp
+++ b/src/checks/health_checker.cpp
@@ -29,6 +29,8 @@
#include <mesos/mesos.hpp>
+#include <mesos/agent/agent.hpp>
+
#include <process/collect.hpp>
#include <process/delay.hpp>
#include <process/dispatch.hpp>
@@ -50,9 +52,12 @@
#include <stout/os/constants.hpp>
#include <stout/os/killtree.hpp>
+#include "common/http.hpp"
#include "common/status_utils.hpp"
#include "common/validation.hpp"
+#include "internal/evolve.hpp"
+
#ifdef __linux__
#include "linux/ns.hpp"
#endif
@@ -66,6 +71,9 @@ using process::Owned;
using process::Subprocess;
using process::Time;
+using process::http::Connection;
+using process::http::Response;
+
using std::map;
using std::string;
using std::tuple;
@@ -140,7 +148,39 @@ Try<Owned<HealthChecker>> HealthChecker::create(
callback,
taskId,
taskPid,
- namespaces));
+ namespaces,
+ None(),
+ None(),
+ false));
+
+ return Owned<HealthChecker>(new HealthChecker(process));
+}
+
+
+Try<Owned<HealthChecker>> HealthChecker::create(
+ const HealthCheck& check,
+ const string& launcherDir,
+ const lambda::function<void(const TaskHealthStatus&)>& callback,
+ const TaskID& taskId,
+ const ContainerID& taskContainerId,
+ const process::http::URL& agentURL)
+{
+ // Validate the 'HealthCheck' protobuf.
+ Option<Error> error = validation::healthCheck(check);
+ if (error.isSome()) {
+ return error.get();
+ }
+
+ Owned<HealthCheckerProcess> process(new HealthCheckerProcess(
+ check,
+ launcherDir,
+ callback,
+ taskId,
+ None(),
+ {},
+ taskContainerId,
+ agentURL,
+ true));
return Owned<HealthChecker>(new HealthChecker(process));
}
@@ -175,7 +215,10 @@ HealthCheckerProcess::HealthCheckerProcess(
const lambda::function<void(const TaskHealthStatus&)>& _callback,
const TaskID& _taskId,
const Option<pid_t>& _taskPid,
- const vector<string>& _namespaces)
+ const vector<string>& _namespaces,
+ const Option<ContainerID>& _taskContainerId,
+ const Option<process::http::URL>& _agentURL,
+ bool _commandCheckViaAgent)
: ProcessBase(process::ID::generate("health-checker")),
check(_check),
launcherDir(_launcherDir),
@@ -183,6 +226,9 @@ HealthCheckerProcess::HealthCheckerProcess(
taskId(_taskId),
taskPid(_taskPid),
namespaces(_namespaces),
+ taskContainerId(_taskContainerId),
+ agentURL(_agentURL),
+ commandCheckViaAgent(_commandCheckViaAgent),
consecutiveFailures(0),
initializing(true)
{
@@ -285,7 +331,8 @@ void HealthCheckerProcess::performSingleCheck()
switch (check.type()) {
case HealthCheck::COMMAND: {
- checkResult = commandHealthCheck();
+ checkResult = commandCheckViaAgent ? nestedCommandHealthCheck()
+ : commandHealthCheck();
break;
}
@@ -417,6 +464,209 @@ Future<Nothing> HealthCheckerProcess::commandHealthCheck()
}
+Future<Nothing> HealthCheckerProcess::nestedCommandHealthCheck()
+{
+ CHECK_EQ(HealthCheck::COMMAND, check.type());
+ CHECK_SOME(taskContainerId);
+ CHECK(check.has_command());
+ CHECK_SOME(agentURL);
+
+ VLOG(1) << "Launching command health check of task " << stringify(taskId);
+
+ return process::http::connect(agentURL.get())
+ .repair([](const Future<Connection>& future) {
+ return Failure(
+ "Unable to establish connection with the agent: " + future.failure());
+ })
+ .then(defer(self(), &Self::_nestedCommandHealthCheck, lambda::_1));
+}
+
+
+Future<Nothing> HealthCheckerProcess::_nestedCommandHealthCheck(
+ Connection connection)
+{
+ // TODO(gkleiman): Don't reuse the `ContainerID`, it is not safe.
+ ContainerID checkContainerId;
+ checkContainerId.set_value(taskContainerId.get().value() + "-health-check");
+ checkContainerId.mutable_parent()->CopyFrom(taskContainerId.get());
+
+ CommandInfo command(check.command());
+
+ agent::Call call;
+ call.set_type(agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
+
+ agent::Call::LaunchNestedContainerSession* launch =
+ call.mutable_launch_nested_container_session();
+
+ launch->mutable_container_id()->CopyFrom(checkContainerId);
+ launch->mutable_command()->CopyFrom(command);
+
+ process::http::Request request;
+ request.method = "POST";
+ request.url = agentURL.get();
+ request.body = serialize(ContentType::PROTOBUF, evolve(call));
+ request.headers = {{"Accept", stringify(ContentType::RECORDIO)},
+ {"Message-Accept", stringify(ContentType::PROTOBUF)},
+ {"Content-Type", stringify(ContentType::PROTOBUF)}};
+
+ // `LAUNCH_NESTED_CONTAINER_SESSION` returns a streamed response with
+ // the output of the container. The agent will close the stream once
+ // the container has exited, or kill the container if the client
+ // closes the connection.
+ //
+ // We're calling `Connection::send` with `streamed = false`, so that
+ // it returns an HTTP response of type 'BODY' once the entire response
+ // is received.
+ //
+ // This means that this future will not be completed until after the
+ // health check command has finished or the connection has been
+ // closed.
+ return connection.send(request, false)
+ .after(checkTimeout,
+ defer(self(),
+ &Self::nestedCommandHealthCheckTimedOut,
+ checkContainerId,
+ connection,
+ lambda::_1))
+ .then(defer(self(),
+ &Self::__nestedCommandHealthCheck,
+ checkContainerId,
+ lambda::_1));
+}
+
+
+Future<Nothing> HealthCheckerProcess::__nestedCommandHealthCheck(
+ const ContainerID& checkContainerId,
+ const Response& launchResponse)
+{
+ if (launchResponse.code != process::http::Status::OK) {
+ return Failure(
+ "Received '" + launchResponse.status + "' (" + launchResponse.body +
+ ") while launching a command health check of task '" +
+ stringify(taskId) + "'");
+ }
+
+ // We need to make a copy so that the lambdas can capture it.
+ const TaskID taskId_ = taskId;
+
+ return waitNestedContainer(checkContainerId)
+ .repair([taskId_](const Future<Option<int>>& future) {
+ return Failure(
+ "Unable to get the exit code of command health check of task '" +
+ stringify(taskId_) + "': " + future.failure());
+ })
+ .then([taskId_](const Option<int> status) -> Future<Nothing> {
+ if (status.isNone()) {
+ return Failure(
+ "Unable to get the exit code of command health check of task '" +
+ stringify(taskId_) + "'");
+ } else if (status.get() != 0) {
+ return Failure(
+ "Command health check of task '" + stringify(taskId_) +
+ "' returned " + WSTRINGIFY(status.get()));
+ } else {
+ return Nothing();
+ }
+ });
+}
+
+
+Future<Response>
+HealthCheckerProcess::nestedCommandHealthCheckTimedOut(
+ const ContainerID& checkContainerId,
+ Connection connection,
+ Future<Response> future)
+{
+ future.discard();
+
+ // Closing the connection will make the agent kill the container.
+ connection.disconnect();
+
+ const Failure failure = Failure(
+ "Command health check of task '" + stringify(taskId) +
+ "' has timed out after " + stringify(checkTimeout));
+
+ // We need to make a copy so that the lambda can capture it.
+ const TaskID taskId_ = taskId;
+
+ // If the health check delay interval is zero, we'll try to perform
+ // another health check right after we finish processing the current
+ // timeout.
+ //
+ // All the containers created for the health checks reuse the same
+ // `ContainerID`. In order to prevent conflicts, the future returned
+ // by this method should only be completed once we're sure that the
+ // container has been cleaned up.
+ return waitNestedContainer(checkContainerId)
+ .repair([failure, taskId_](const Future<Option<int>>& waitFuture) {
+ // We assume that once `WaitNestedContainer` returns, irrespective of
+ // whether the response contains a failure, the container will be in a
+ // terminal state, so starting a new health check will not lead to a
+ // transient failure.
+ //
+ // This means that we don't need to retry the `WaitNestedContainer`
+ // call.
+ LOG(WARNING) << "Unable to get the exit code of command health check of "
+ << "task '" << stringify(taskId_)
+ << "': " << waitFuture.failure();
+
+ return Future<Option<int>>(failure);
+ })
+ .then([failure](const Option<int>&) {
+ return Future<Response>(failure);
+ });
+}
+
+
+Future<Option<int>> HealthCheckerProcess::waitNestedContainer(
+ const ContainerID& containerId)
+{
+ agent::Call call;
+ call.set_type(agent::Call::WAIT_NESTED_CONTAINER);
+
+ agent::Call::WaitNestedContainer* containerWait =
+ call.mutable_wait_nested_container();
+
+ containerWait->mutable_container_id()->CopyFrom(containerId);
+
+ process::http::Request request;
+ request.method = "POST";
+ request.url = agentURL.get();
+ request.body = serialize(ContentType::PROTOBUF, evolve(call));
+ request.headers = {{"Accept", stringify(ContentType::PROTOBUF)},
+ {"Content-Type", stringify(ContentType::PROTOBUF)}};
+
+ return process::http::request(request, false)
+ .then(defer(self(),
+ &Self::_waitNestedContainer,
+ containerId,
+ lambda::_1));
+}
+
+
+Future<Option<int>> HealthCheckerProcess::_waitNestedContainer(
+ const ContainerID& containerId,
+ const Response& httpResponse)
+{
+ if (httpResponse.code != process::http::Status::OK) {
+ return Failure(
+ "Received '" + httpResponse.status + "' (" + httpResponse.body +
+ ") while waiting on health check of task " + stringify(taskId));
+ }
+
+ Try<agent::Response> response =
+ deserialize<agent::Response>(ContentType::PROTOBUF, httpResponse.body);
+ CHECK_SOME(response);
+
+ CHECK(response->has_wait_nested_container());
+
+ return (
+ response->wait_nested_container().has_exit_status()
+ ? Option<int>(response->wait_nested_container().exit_status())
+ : Option<int>::none());
+}
+
+
Future<Nothing> HealthCheckerProcess::httpHealthCheck()
{
CHECK_EQ(HealthCheck::HTTP, check.type());
http://git-wip-us.apache.org/repos/asf/mesos/blob/4bbfaebb/src/checks/health_checker.hpp
----------------------------------------------------------------------
diff --git a/src/checks/health_checker.hpp b/src/checks/health_checker.hpp
index 44df544..a7307ac 100644
--- a/src/checks/health_checker.hpp
+++ b/src/checks/health_checker.hpp
@@ -51,6 +51,9 @@ public:
* Attempts to create a `HealthChecker` object. In case of success, health
* checking starts immediately after initialization.
*
+ * If the check is a command health check, the checker will fork a process,
+ * enter the task's namespaces, and execute the commmand.
+ *
* @param check The protobuf message definition of health check.
* @param launcherDir A directory where Mesos helper binaries are located.
* @param callback A callback HealthChecker uses to send health status
@@ -76,6 +79,35 @@ public:
const Option<pid_t>& taskPid,
const std::vector<std::string>& namespaces);
+ /**
+ * Attempts to create a `HealthChecker` object. In case of success, health
+ * checking starts immediately after initialization.
+ *
+ * If the check is a command health check, the checker will delegate the
+ * execution of the check to the Mesos agent via the
+ * `LaunchNestedContainerSession` API call.
+ *
+ * @param check The protobuf message definition of health check.
+ * @param launcherDir A directory where Mesos helper binaries are located.
+ * @param callback A callback HealthChecker uses to send health status
+ * updates to its owner (usually an executor).
+ * @param taskId The TaskID of the target task.
+ * @param taskContainerId The ContainerID of the target task.
+ * @param agentURL The URL of the agent.
+ * @return A `HealthChecker` object or an error if `create` fails.
+ *
+ * @todo A better approach would be to return a stream of updates, e.g.,
+ * `process::Stream<TaskHealthStatus>` rather than invoking a callback.
+ */
+ static Try<process::Owned<HealthChecker>> create(
+ const HealthCheck& check,
+ const std::string& launcherDir,
+ const lambda::function<void(const TaskHealthStatus&)>& callback,
+ const TaskID& taskId,
+ const ContainerID& taskContainerId,
+ const process::http::URL& agentURL);
+
+
~HealthChecker();
/**
@@ -99,7 +131,10 @@ public:
const lambda::function<void(const TaskHealthStatus&)>& _callback,
const TaskID& _taskId,
const Option<pid_t>& _taskPid,
- const std::vector<std::string>& _namespaces);
+ const std::vector<std::string>& _namespaces,
+ const Option<ContainerID>& _taskContainerId,
+ const Option<process::http::URL>& _agentURL,
+ bool _commandCheckViaAgent);
virtual ~HealthCheckerProcess() {}
@@ -117,6 +152,39 @@ private:
process::Future<Nothing> commandHealthCheck();
+ process::Future<Nothing> nestedCommandHealthCheck();
+
+ process::Future<Nothing> _nestedCommandHealthCheck(
+ process::http::Connection connection);
+
+ process::Future<Nothing> __nestedCommandHealthCheck(
+ const ContainerID& checkContainerId,
+ const process::http::Response& launchResponse);
+
+ process::Future<process::http::Response>
+ nestedCommandHealthCheckTimedOut(
+ const ContainerID& checkContainerId,
+ process::http::Connection connection,
+ process::Future<process::http::Response> future);
+
+ /**
+ * Waits for a container to be terminated.
+ *
+ * Waits for a container to be terminated via the Agent's
+ * `WaitNestedContainer` API call.
+ *
+ * @param containerID The `ContainerID` of the container that we want
+ * to wait for.
+ *
+ * @return The exit status as returned by the Agent.
+ */
+ process::Future<Option<int>> waitNestedContainer(
+ const ContainerID& containerId);
+
+ process::Future<Option<int>> _waitNestedContainer(
+ const ContainerID& containerId,
+ const process::http::Response& httpResponse);
+
process::Future<Nothing> httpHealthCheck();
process::Future<Nothing> _httpHealthCheck(
@@ -148,6 +216,10 @@ private:
const TaskID taskId;
const Option<pid_t> taskPid;
const std::vector<std::string> namespaces;
+ const Option<ContainerID> taskContainerId;
+ const Option<process::http::URL> agentURL;
+ const bool commandCheckViaAgent;
+
Option<lambda::function<pid_t(const lambda::function<int()>&)>> clone;
uint32_t consecutiveFailures;
http://git-wip-us.apache.org/repos/asf/mesos/blob/4bbfaebb/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index f19bba9..58efb4c 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -143,6 +143,8 @@ public:
connectionId = UUID::random();
doReliableRegistration();
+
+ // TODO(gkleiman): Resume (health) checks.
}
void disconnected()
@@ -160,6 +162,8 @@ public:
container->waiting = None();
}
}
+
+ // TODO(gkleiman): Stop (health) checks.
}
void received(const Event& event)
@@ -502,18 +506,14 @@ protected:
}
if (task.has_health_check()) {
- // TODO(anand): Add support for command health checks.
- CHECK_NE(HealthCheck::COMMAND, task.health_check().type())
- << "Command health checks are not supported yet";
-
Try<Owned<checks::HealthChecker>> healthChecker =
checks::HealthChecker::create(
task.health_check(),
launcherDirectory,
defer(self(), &Self::taskHealthUpdated, lambda::_1),
taskId,
- None(),
- vector<string>());
+ containerId,
+ agent);
if (healthChecker.isError()) {
// TODO(anand): Should we send a TASK_FAILED instead?
http://git-wip-us.apache.org/repos/asf/mesos/blob/4bbfaebb/src/tests/health_check_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/health_check_tests.cpp b/src/tests/health_check_tests.cpp
index a5b35cf..211f8b8 100644
--- a/src/tests/health_check_tests.cpp
+++ b/src/tests/health_check_tests.cpp
@@ -35,6 +35,7 @@
#include "tests/health_check_test_helper.hpp"
#include "tests/mesos.hpp"
#include "tests/mock_docker.hpp"
+#include "tests/resources_utils.hpp"
#include "tests/utils.hpp"
#ifdef __linux__
@@ -2131,6 +2132,118 @@ TEST_F(HealthCheckTest, ROOT_DOCKER_DockerHealthyTaskViaTCP)
}
}
+
+TEST_F_TEMP_DISABLED_ON_WINDOWS(
+ HealthCheckTest, DefaultExecutorCommandHealthCheck)
+{
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ // Disable AuthN on the agent.
+ slave::Flags flags = CreateSlaveFlags();
+ flags.authenticate_http_readwrite = false;
+
+ Fetcher fetcher;
+
+ // We have to explicitly create a `Containerizer` in non-local mode,
+ // because `LaunchNestedContainerSession` (used by command health
+ // checks) tries to start a IO switchboard, which doesn't work in
+ // local mode yet.
+ Try<MesosContainerizer*> _containerizer =
+ MesosContainerizer::create(flags, false, &fetcher);
+
+ ASSERT_SOME(_containerizer);
+
+ Owned<slave::Containerizer> containerizer(_containerizer.get());
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Try<Owned<cluster::Slave>> agent =
+ StartSlave(detector.get(), containerizer.get(), flags);
+ ASSERT_SOME(agent);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ Future<FrameworkID> frameworkId;
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .WillOnce(FutureArg<1>(&frameworkId));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(frameworkId);
+
+ AWAIT_READY(offers);
+ EXPECT_NE(0u, offers.get().size());
+
+ Future<TaskStatus> statusRunning;
+ Future<TaskStatus> statusHealthy;
+
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&statusRunning))
+ .WillOnce(FutureArg<1>(&statusHealthy));
+
+ TaskInfo task = createTask(offers->front(), "sleep 120");
+
+ HealthCheck healthCheck;
+
+ healthCheck.set_type(HealthCheck::COMMAND);
+ healthCheck.mutable_command()->set_value("exit $STATUS");
+ healthCheck.set_delay_seconds(0);
+ healthCheck.set_interval_seconds(0);
+ healthCheck.set_grace_period_seconds(0);
+
+ Environment::Variable* variable = healthCheck.mutable_command()->
+ mutable_environment()->mutable_variables()->Add();
+ variable->set_name("STATUS");
+ variable->set_value("0");
+
+ task.mutable_health_check()->CopyFrom(healthCheck);
+
+ Resources executorResources =
+ allocatedResources(Resources::parse("cpus:0.1;mem:32;disk:32").get(), "*");
+
+ task.mutable_resources()->CopyFrom(task.resources() - executorResources);
+
+ TaskGroupInfo taskGroup;
+ taskGroup.add_tasks()->CopyFrom(task);
+
+ ExecutorInfo executor;
+ executor.mutable_executor_id()->set_value("default");
+ executor.set_type(ExecutorInfo::DEFAULT);
+ executor.mutable_framework_id()->CopyFrom(frameworkId.get());
+ executor.mutable_resources()->CopyFrom(executorResources);
+ executor.mutable_shutdown_grace_period()->set_nanoseconds(Seconds(10).ns());
+
+ driver.acceptOffers(
+ {offers->front().id()}, {LAUNCH_GROUP(executor, taskGroup)});
+
+ AWAIT_READY(statusRunning);
+ EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+ AWAIT_READY(statusHealthy);
+ EXPECT_EQ(TASK_RUNNING, statusHealthy.get().state());
+ EXPECT_TRUE(statusHealthy.get().has_healthy());
+ EXPECT_TRUE(statusHealthy.get().healthy());
+
+ Future<hashset<ContainerID>> containerIds = containerizer->containers();
+
+ AWAIT_READY(containerIds);
+
+ driver.stop();
+ driver.join();
+
+ // Cleanup all mesos launched containers.
+ foreach (const ContainerID& containerId, containerIds.get()) {
+ AWAIT_READY(containerizer->wait(containerId));
+ }
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {