You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/03/15 21:21:09 UTC
[08/13] mesos git commit: Updated checks library with general check
support.
Updated checks library with general check support.
Add support for general checks, i.e. defined by CheckInfo, in
checking library. A general check can be either an command or
an HTTP request. The library performs the requested check at
the specified interval and sends the result to the framework
via a task status update. If the current result is the same as
the previous result, no status update is sent.
Review: https://reviews.apache.org/r/56208
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f042bf60
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f042bf60
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f042bf60
Branch: refs/heads/master
Commit: f042bf607f9bdcf4d350df892b6985760a7ea7e8
Parents: bf196f0
Author: Alexander Rukletsov <al...@apache.org>
Authored: Sun Feb 5 21:53:51 2017 +0100
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Mar 15 22:20:20 2017 +0100
----------------------------------------------------------------------
src/checks/checker.cpp | 578 ++++++++++++++++++++++++++++++++++++-
src/checks/checker.hpp | 53 ++++
src/checks/health_checker.cpp | 2 +-
src/checks/health_checker.hpp | 3 +
4 files changed, 634 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f042bf60/src/checks/checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.cpp b/src/checks/checker.cpp
index 8716e4c..94315d7 100644
--- a/src/checks/checker.cpp
+++ b/src/checks/checker.cpp
@@ -16,20 +16,596 @@
#include "checks/checker.hpp"
+#include <cstdint>
+#include <iterator>
+#include <map>
#include <string>
+#include <tuple>
+#include <vector>
-#include <mesos/mesos.hpp>
+#include <glog/logging.h>
+#include <mesos/mesos.hpp>
+#include <mesos/type_utils.hpp>
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/future.hpp>
+#include <process/io.hpp>
+#include <process/protobuf.hpp>
+#include <process/subprocess.hpp>
+#include <process/time.hpp>
+
+#include <stout/check.hpp>
+#include <stout/duration.hpp>
+#include <stout/exit.hpp>
+#include <stout/foreach.hpp>
+#include <stout/jsonify.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/stopwatch.hpp>
#include <stout/strings.hpp>
+#include <stout/try.hpp>
+#include <stout/unreachable.hpp>
+
+#include <stout/os/environment.hpp>
+#include <stout/os/killtree.hpp>
+#include "common/protobuf_utils.hpp"
+#include "common/status_utils.hpp"
#include "common/validation.hpp"
+#ifdef __linux__
+#include "linux/ns.hpp"
+#endif
+
+using process::Failure;
+using process::Future;
+using process::Owned;
+using process::Subprocess;
+
+using std::map;
using std::string;
+using std::tuple;
+using std::vector;
namespace mesos {
namespace internal {
namespace checks {
+#ifndef __WINDOWS__
+constexpr char HTTP_CHECK_COMMAND[] = "curl";
+#else
+constexpr char HTTP_CHECK_COMMAND[] = "curl.exe";
+#endif // __WINDOWS__
+
+static const string DEFAULT_HTTP_SCHEME = "http";
+
+// Use '127.0.0.1' instead of 'localhost', because the host
+// file in some container images may not contain 'localhost'.
+static const string DEFAULT_DOMAIN = "127.0.0.1";
+
+
+#ifdef __linux__
+// TODO(alexr): Instead of defining this ad-hoc clone function, provide a
+// general solution for entring namespace in child processes, see MESOS-6184.
+static pid_t cloneWithSetns(
+ const lambda::function<int()>& func,
+ const Option<pid_t>& taskPid,
+ const vector<string>& namespaces)
+{
+ return process::defaultClone([=]() -> int {
+ if (taskPid.isSome()) {
+ foreach (const string& ns, namespaces) {
+ Try<Nothing> setns = ns::setns(taskPid.get(), ns);
+ if (setns.isError()) {
+ // This effectively aborts the check.
+ LOG(FATAL) << "Failed to enter the " << ns << " namespace of "
+ << "task (pid: '" << taskPid.get() << "'): "
+ << setns.error();
+ }
+
+ VLOG(1) << "Entered the " << ns << " namespace of "
+ << "task (pid: '" << taskPid.get() << "') successfully";
+ }
+ }
+
+ return func();
+ });
+}
+#endif
+
+
+class CheckerProcess : public ProtobufProcess<CheckerProcess>
+{
+public:
+ CheckerProcess(
+ const CheckInfo& _check,
+ const lambda::function<void(const CheckStatusInfo&)>& _callback,
+ const TaskID& _taskId,
+ const Option<pid_t>& _taskPid,
+ const std::vector<std::string>& _namespaces);
+
+ virtual ~CheckerProcess() {}
+
+protected:
+ void initialize() override;
+ void finalize() override;
+
+private:
+ void performCheck();
+ void scheduleNext(const Duration& duration);
+ void processCheckResult(
+ const Stopwatch& stopwatch,
+ const CheckStatusInfo& result);
+
+ process::Future<int> commandCheck();
+ void processCommandCheckResult(
+ const Stopwatch& stopwatch,
+ const process::Future<int>& result);
+
+ process::Future<int> httpCheck();
+ process::Future<int> _httpCheck(
+ const std::tuple<
+ process::Future<Option<int>>,
+ process::Future<std::string>,
+ process::Future<std::string>>& t);
+ void processHttpCheckResult(
+ const Stopwatch& stopwatch,
+ const process::Future<int>& result);
+
+ const CheckInfo check;
+ Duration checkDelay;
+ Duration checkInterval;
+ Duration checkTimeout;
+
+ const lambda::function<void(const CheckStatusInfo&)> updateCallback;
+ const TaskID taskId;
+ const Option<pid_t> taskPid;
+ const std::vector<std::string> namespaces;
+ Option<lambda::function<pid_t(const lambda::function<int()>&)>> clone;
+
+ CheckStatusInfo previousCheckStatus;
+};
+
+
+Try<Owned<Checker>> Checker::create(
+ const CheckInfo& check,
+ const lambda::function<void(const CheckStatusInfo&)>& callback,
+ const TaskID& taskId,
+ const Option<pid_t>& taskPid,
+ const vector<string>& namespaces)
+{
+ // Validate the `CheckInfo` protobuf.
+ Option<Error> error = validation::checkInfo(check);
+ if (error.isSome()) {
+ return error.get();
+ }
+
+ Owned<CheckerProcess> process(new CheckerProcess(
+ check,
+ callback,
+ taskId,
+ taskPid,
+ namespaces));
+
+ return Owned<Checker>(new Checker(process));
+}
+
+
+Checker::Checker(Owned<CheckerProcess> _process)
+ : process(_process)
+{
+ spawn(CHECK_NOTNULL(process.get()));
+}
+
+
+Checker::~Checker()
+{
+ terminate(process.get());
+ wait(process.get());
+}
+
+
+void Checker::stop()
+{
+ terminate(process.get(), true);
+}
+
+
+CheckerProcess::CheckerProcess(
+ const CheckInfo& _check,
+ const lambda::function<void(const CheckStatusInfo&)>& _callback,
+ const TaskID& _taskId,
+ const Option<pid_t>& _taskPid,
+ const vector<string>& _namespaces)
+ : ProcessBase(process::ID::generate("checker")),
+ check(_check),
+ updateCallback(_callback),
+ taskId(_taskId),
+ taskPid(_taskPid),
+ namespaces(_namespaces)
+{
+ Try<Duration> create = Duration::create(check.delay_seconds());
+ CHECK_SOME(create);
+ checkDelay = create.get();
+
+ create = Duration::create(check.interval_seconds());
+ CHECK_SOME(create);
+ checkInterval = create.get();
+
+ // Zero value means infinite timeout.
+ create = Duration::create(check.timeout_seconds());
+ CHECK_SOME(create);
+ checkTimeout =
+ (create.get() > Duration::zero()) ? create.get() : Duration::max();
+
+ // The first check update should be sent only when a check succeeds,
+ // hence we should deduplicate against a corresponding "empty" result.
+ previousCheckStatus.set_type(check.type());
+ switch (check.type()) {
+ case CheckInfo::COMMAND: {
+ previousCheckStatus.mutable_command();
+ break;
+ }
+
+ case CheckInfo::HTTP: {
+ previousCheckStatus.mutable_http();
+ break;
+ }
+
+ case CheckInfo::UNKNOWN: {
+ LOG(FATAL) << "Received UNKNOWN check type";
+ break;
+ }
+ }
+
+#ifdef __linux__
+ if (!namespaces.empty()) {
+ clone = lambda::bind(&cloneWithSetns, lambda::_1, taskPid, namespaces);
+ }
+#endif
+}
+
+
+void CheckerProcess::initialize()
+{
+ VLOG(1) << "Check configuration for task " << taskId << ":"
+ << " '" << jsonify(JSON::Protobuf(check)) << "'";
+
+ scheduleNext(checkDelay);
+}
+
+
+void CheckerProcess::finalize()
+{
+ LOG(INFO) << "Checking for task " << taskId << " stopped";
+}
+
+
+void CheckerProcess::performCheck()
+{
+ Stopwatch stopwatch;
+ stopwatch.start();
+
+ switch (check.type()) {
+ case CheckInfo::COMMAND: {
+ commandCheck().onAny(defer(
+ self(),
+ &Self::processCommandCheckResult, stopwatch, lambda::_1));
+ break;
+ }
+
+ case CheckInfo::HTTP: {
+ httpCheck().onAny(defer(
+ self(),
+ &Self::processHttpCheckResult, stopwatch, lambda::_1));
+ break;
+ }
+
+ case CheckInfo::UNKNOWN: {
+ LOG(FATAL) << "Received UNKNOWN check type";
+ break;
+ }
+ }
+}
+
+
+void CheckerProcess::scheduleNext(const Duration& duration)
+{
+ VLOG(1) << "Scheduling check for task " << taskId << " in " << duration;
+
+ delay(duration, self(), &Self::performCheck);
+}
+
+
+void CheckerProcess::processCheckResult(
+ const Stopwatch& stopwatch,
+ const CheckStatusInfo& result)
+{
+ VLOG(1) << "Performed " << check.type() << " check for task " << taskId
+ << " in " << stopwatch.elapsed();
+
+ // Trigger the callback if check info changes.
+ if (result != previousCheckStatus) {
+ // We assume this is a local send, i.e., the checker library is not used
+ // in a binary external to the executor and hence can not exit before
+ // the data is sent to the executor.
+ updateCallback(result);
+ previousCheckStatus = result;
+ }
+
+ scheduleNext(checkInterval);
+}
+
+
+Future<int> CheckerProcess::commandCheck()
+{
+ CHECK_EQ(CheckInfo::COMMAND, check.type());
+ CHECK(check.has_command());
+
+ const CommandInfo& command = check.command().command();
+
+ map<string, string> environment = os::environment();
+
+ foreach (const Environment::Variable& variable,
+ command.environment().variables()) {
+ environment[variable.name()] = variable.value();
+ }
+
+ // Launch the subprocess.
+ Try<Subprocess> s = Error("Not launched");
+
+ if (command.shell()) {
+ // Use the shell variant.
+ VLOG(1) << "Launching command check '" << command.value() << "'"
+ << " for task " << taskId;
+
+ s = process::subprocess(
+ command.value(),
+ Subprocess::PATH("/dev/null"),
+ Subprocess::FD(STDERR_FILENO),
+ Subprocess::FD(STDERR_FILENO),
+ environment,
+ clone);
+ } else {
+ // Use the exec variant.
+ vector<string> argv(
+ std::begin(command.arguments()), std::end(command.arguments()));
+
+ VLOG(1) << "Launching command check [" << command.value() << ", "
+ << strings::join(", ", argv) << "] for task " << taskId;
+
+ s = process::subprocess(
+ command.value(),
+ argv,
+ Subprocess::PATH("/dev/null"),
+ Subprocess::FD(STDERR_FILENO),
+ Subprocess::FD(STDERR_FILENO),
+ nullptr,
+ environment,
+ clone);
+ }
+
+ if (s.isError()) {
+ return Failure("Failed to create subprocess: " + s.error());
+ }
+
+ // TODO(alexr): Use lambda named captures for
+ // these cached values once it is available.
+ const pid_t commandPid = s->pid();
+ const Duration timeout = checkTimeout;
+ const TaskID _taskId = taskId;
+
+ return s->status()
+ .after(
+ timeout,
+ [timeout, commandPid, _taskId](Future<Option<int>> future) {
+ future.discard();
+
+ if (commandPid != -1) {
+ // Cleanup the external command process.
+ VLOG(1) << "Killing the command check process " << commandPid
+ << " for task " << _taskId;
+
+ os::killtree(commandPid, SIGKILL);
+ }
+
+ return Failure(
+ "Command timed out after " + stringify(timeout) + "; aborting");
+ })
+ .then([](const Option<int>& exitCode) -> Future<int> {
+ if (exitCode.isNone()) {
+ return Failure("Failed to reap the command process");
+ }
+
+ return exitCode.get();
+ });
+}
+
+
+void CheckerProcess::processCommandCheckResult(
+ const Stopwatch& stopwatch,
+ const Future<int>& result)
+{
+ CheckStatusInfo checkStatusInfo;
+ checkStatusInfo.set_type(check.type());
+
+ if (result.isReady() && WIFEXITED(result.get())) {
+ const int exitCode = WEXITSTATUS(result.get());
+ VLOG(1) << check.type() << " check for task "
+ << taskId << " returned " << exitCode;
+
+ checkStatusInfo.mutable_command()->set_exit_code(
+ static_cast<int32_t>(exitCode));
+ } else {
+ // Check's status is currently not available, which may indicate a change
+ // that should be reported as an empty `CheckStatusInfo.Command` message.
+ LOG(WARNING) << "Check for task " << taskId << " failed: "
+ << (result.isFailed() ? result.failure() : "discarded");
+
+ checkStatusInfo.mutable_command();
+ }
+
+ processCheckResult(stopwatch, checkStatusInfo);
+}
+
+
+Future<int> CheckerProcess::httpCheck()
+{
+ CHECK_EQ(CheckInfo::HTTP, check.type());
+ CHECK(check.has_http());
+
+ const CheckInfo::Http& http = check.http();
+
+ const string scheme = DEFAULT_HTTP_SCHEME;
+ const string path = http.has_path() ? http.path() : "";
+ const string url = scheme + "://" + DEFAULT_DOMAIN + ":" +
+ stringify(http.port()) + path;
+
+ VLOG(1) << "Launching HTTP check '" << url << "' for task " << taskId;
+
+ const vector<string> argv = {
+ HTTP_CHECK_COMMAND,
+ "-s", // Don't show progress meter or error messages.
+ "-S", // Makes curl show an error message if it fails.
+ "-L", // Follows HTTP 3xx redirects.
+ "-k", // Ignores SSL validation when scheme is https.
+ "-w", "%{http_code}", // Displays HTTP response code on stdout.
+ "-o", "/dev/null", // Ignores output.
+ url
+ };
+
+ // TODO(alexr): Consider launching the helper binary once per task lifetime,
+ // see MESOS-6766.
+ Try<Subprocess> s = process::subprocess(
+ HTTP_CHECK_COMMAND,
+ argv,
+ Subprocess::PATH("/dev/null"),
+ Subprocess::PIPE(),
+ Subprocess::PIPE(),
+ nullptr,
+ None(),
+ clone);
+
+ if (s.isError()) {
+ return Failure(
+ "Failed to create the " + string(HTTP_CHECK_COMMAND) +
+ " subprocess: " + s.error());
+ }
+
+ // TODO(alexr): Use lambda named captures for
+ // these cached values once it is available.
+ const pid_t curlPid = s->pid();
+ const Duration timeout = checkTimeout;
+ const TaskID _taskId = taskId;
+
+ return await(
+ s->status(),
+ process::io::read(s->out().get()),
+ process::io::read(s->err().get()))
+ .after(
+ timeout,
+ [timeout, curlPid, _taskId](Future<tuple<Future<Option<int>>,
+ Future<string>,
+ Future<string>>> future) {
+ future.discard();
+
+ if (curlPid != -1) {
+ // Cleanup the HTTP_CHECK_COMMAND process.
+ VLOG(1) << "Killing the HTTP check process " << curlPid
+ << " for task " << _taskId;
+
+ os::killtree(curlPid, SIGKILL);
+ }
+
+ return Failure(
+ string(HTTP_CHECK_COMMAND) + " timed out after " +
+ stringify(timeout) + "; aborting");
+ })
+ .then(defer(self(), &Self::_httpCheck, lambda::_1));
+}
+
+
+Future<int> CheckerProcess::_httpCheck(
+ const tuple<
+ Future<Option<int>>,
+ Future<string>,
+ Future<string>>& t)
+{
+ Future<Option<int>> status = std::get<0>(t);
+ if (!status.isReady()) {
+ return Failure(
+ "Failed to get the exit status of the " + string(HTTP_CHECK_COMMAND) +
+ " process: " + (status.isFailed() ? status.failure() : "discarded"));
+ }
+
+ if (status->isNone()) {
+ return Failure(
+ "Failed to reap the " + string(HTTP_CHECK_COMMAND) + " process");
+ }
+
+ int exitCode = status->get();
+ if (exitCode != 0) {
+ Future<string> error = std::get<2>(t);
+ if (!error.isReady()) {
+ return Failure(
+ string(HTTP_CHECK_COMMAND) + " returned " +
+ WSTRINGIFY(exitCode) + "; reading stderr failed: " +
+ (error.isFailed() ? error.failure() : "discarded"));
+ }
+
+ return Failure(
+ string(HTTP_CHECK_COMMAND) + " returned " +
+ WSTRINGIFY(exitCode) + ": " + error.get());
+ }
+
+ Future<string> output = std::get<1>(t);
+ if (!output.isReady()) {
+ return Failure(
+ "Failed to read stdout from " + string(HTTP_CHECK_COMMAND) + ": " +
+ (output.isFailed() ? output.failure() : "discarded"));
+ }
+
+ // Parse the output and get the HTTP status code.
+ Try<int> statusCode = numify<int>(output.get());
+ if (statusCode.isError()) {
+ return Failure(
+ "Unexpected output from " + string(HTTP_CHECK_COMMAND) + ": " +
+ output.get());
+ }
+
+ return statusCode.get();
+}
+
+
+void CheckerProcess::processHttpCheckResult(
+ const Stopwatch& stopwatch,
+ const process::Future<int>& result)
+{
+ CheckStatusInfo checkStatusInfo;
+ checkStatusInfo.set_type(check.type());
+
+ if (result.isReady()) {
+ VLOG(1) << check.type() << " check for task "
+ << taskId << " returned " << result.get();
+
+ checkStatusInfo.mutable_http()->set_status_code(
+ static_cast<uint32_t>(result.get()));
+ } else {
+ // Check's status is currently not available, which may indicate a change
+ // that should be reported as an empty `CheckStatusInfo.Http` message.
+ LOG(WARNING) << "Check for task " << taskId << " failed: "
+ << (result.isFailed() ? result.failure() : "discarded");
+
+ checkStatusInfo.mutable_http();
+ }
+
+ processCheckResult(stopwatch, checkStatusInfo);
+}
+
namespace validation {
Option<Error> checkInfo(const CheckInfo& checkInfo)
http://git-wip-us.apache.org/repos/asf/mesos/blob/f042bf60/src/checks/checker.hpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.hpp b/src/checks/checker.hpp
index dc293f3..e8af316 100644
--- a/src/checks/checker.hpp
+++ b/src/checks/checker.hpp
@@ -17,15 +17,68 @@
#ifndef __CHECKER_HPP__
#define __CHECKER_HPP__
+#include <string>
+#include <vector>
+
#include <mesos/mesos.hpp>
+#include <process/owned.hpp>
+
#include <stout/error.hpp>
+#include <stout/lambda.hpp>
#include <stout/option.hpp>
+#include <stout/try.hpp>
namespace mesos {
namespace internal {
namespace checks {
+// Forward declarations.
+class CheckerProcess;
+
+class Checker
+{
+public:
+ /**
+ * Attempts to create a `Checker` object. In case of success, checking
+ * starts immediately after initialization.
+ *
+ * @param check The protobuf message definition of a check.
+ * @param callback A callback `Checker` uses to send check status updates
+ * to its owner (usually an executor).
+ * @param taskId The TaskID of the target task.
+ * @param taskPid The target task's pid used to enter the specified
+ * namespaces.
+ * @param namespaces The namespaces to enter prior to performing the check.
+ * @return A `Checker` object or an error if `create` fails.
+ *
+ * @todo A better approach would be to return a stream of updates, e.g.,
+ * `process::Stream<CheckStatusInfo>` rather than invoking a callback.
+ */
+ static Try<process::Owned<Checker>> create(
+ const CheckInfo& checkInfo,
+ const lambda::function<void(const CheckStatusInfo&)>& callback,
+ const TaskID& taskId,
+ const Option<pid_t>& taskPid,
+ const std::vector<std::string>& namespaces);
+
+ ~Checker();
+
+ // Not copyable, not assignable.
+ Checker(const Checker&) = delete;
+ Checker& operator=(const Checker&) = delete;
+
+ /**
+ * Immediately stops checking. Any in-flight checks are dropped.
+ */
+ void stop();
+
+private:
+ explicit Checker(process::Owned<CheckerProcess> process);
+
+ process::Owned<CheckerProcess> process;
+};
+
namespace validation {
// TODO(alexr): A better place for these functions would be something like
http://git-wip-us.apache.org/repos/asf/mesos/blob/f042bf60/src/checks/health_checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/health_checker.cpp b/src/checks/health_checker.cpp
index 6c97369..7efefe9 100644
--- a/src/checks/health_checker.cpp
+++ b/src/checks/health_checker.cpp
@@ -92,7 +92,7 @@ static const string DEFAULT_DOMAIN = "127.0.0.1";
#ifdef __linux__
// TODO(alexr): Instead of defining this ad-hoc clone function, provide a
// general solution for entring namespace in child processes, see MESOS-6184.
-pid_t cloneWithSetns(
+static pid_t cloneWithSetns(
const lambda::function<int()>& func,
const Option<pid_t>& taskPid,
const vector<string>& namespaces)
http://git-wip-us.apache.org/repos/asf/mesos/blob/f042bf60/src/checks/health_checker.hpp
----------------------------------------------------------------------
diff --git a/src/checks/health_checker.hpp b/src/checks/health_checker.hpp
index f1f2834..44df544 100644
--- a/src/checks/health_checker.hpp
+++ b/src/checks/health_checker.hpp
@@ -64,6 +64,9 @@ public:
*
* @todo A better approach would be to return a stream of updates, e.g.,
* `process::Stream<TaskHealthStatus>` rather than invoking a callback.
+ *
+ * @todo Consider leveraging `checks::Checker` for checking functionality.
+ * This class will then focus on interpreting and acting on the result.
*/
static Try<process::Owned<HealthChecker>> create(
const HealthCheck& check,