You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/03/15 21:21:09 UTC

[08/13] mesos git commit: Updated checks library with general check support.

Updated checks library with general check support.

Add support for general checks, i.e. defined by CheckInfo, in
checking library. A general check can be either an command or
an HTTP request. The library performs the requested check at
the specified interval and sends the result to the framework
via a task status update. If the current result is the same as
the previous result, no status update is sent.

Review: https://reviews.apache.org/r/56208


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f042bf60
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f042bf60
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f042bf60

Branch: refs/heads/master
Commit: f042bf607f9bdcf4d350df892b6985760a7ea7e8
Parents: bf196f0
Author: Alexander Rukletsov <al...@apache.org>
Authored: Sun Feb 5 21:53:51 2017 +0100
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Mar 15 22:20:20 2017 +0100

----------------------------------------------------------------------
 src/checks/checker.cpp        | 578 ++++++++++++++++++++++++++++++++++++-
 src/checks/checker.hpp        |  53 ++++
 src/checks/health_checker.cpp |   2 +-
 src/checks/health_checker.hpp |   3 +
 4 files changed, 634 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f042bf60/src/checks/checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.cpp b/src/checks/checker.cpp
index 8716e4c..94315d7 100644
--- a/src/checks/checker.cpp
+++ b/src/checks/checker.cpp
@@ -16,20 +16,596 @@
 
 #include "checks/checker.hpp"
 
+#include <cstdint>
+#include <iterator>
+#include <map>
 #include <string>
+#include <tuple>
+#include <vector>
 
-#include <mesos/mesos.hpp>
+#include <glog/logging.h>
 
+#include <mesos/mesos.hpp>
+#include <mesos/type_utils.hpp>
+
+#include <process/collect.hpp>
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/future.hpp>
+#include <process/io.hpp>
+#include <process/protobuf.hpp>
+#include <process/subprocess.hpp>
+#include <process/time.hpp>
+
+#include <stout/check.hpp>
+#include <stout/duration.hpp>
+#include <stout/exit.hpp>
+#include <stout/foreach.hpp>
+#include <stout/jsonify.hpp>
+#include <stout/nothing.hpp>
+#include <stout/option.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/stopwatch.hpp>
 #include <stout/strings.hpp>
+#include <stout/try.hpp>
+#include <stout/unreachable.hpp>
+
+#include <stout/os/environment.hpp>
+#include <stout/os/killtree.hpp>
 
+#include "common/protobuf_utils.hpp"
+#include "common/status_utils.hpp"
 #include "common/validation.hpp"
 
+#ifdef __linux__
+#include "linux/ns.hpp"
+#endif
+
+using process::Failure;
+using process::Future;
+using process::Owned;
+using process::Subprocess;
+
+using std::map;
 using std::string;
+using std::tuple;
+using std::vector;
 
 namespace mesos {
 namespace internal {
 namespace checks {
 
+#ifndef __WINDOWS__
+constexpr char HTTP_CHECK_COMMAND[] = "curl";
+#else
+constexpr char HTTP_CHECK_COMMAND[] = "curl.exe";
+#endif // __WINDOWS__
+
+static const string DEFAULT_HTTP_SCHEME = "http";
+
+// Use '127.0.0.1' instead of 'localhost', because the host
+// file in some container images may not contain 'localhost'.
+static const string DEFAULT_DOMAIN = "127.0.0.1";
+
+
+#ifdef __linux__
+// TODO(alexr): Instead of defining this ad-hoc clone function, provide a
+// general solution for entring namespace in child processes, see MESOS-6184.
+static pid_t cloneWithSetns(
+    const lambda::function<int()>& func,
+    const Option<pid_t>& taskPid,
+    const vector<string>& namespaces)
+{
+  return process::defaultClone([=]() -> int {
+    if (taskPid.isSome()) {
+      foreach (const string& ns, namespaces) {
+        Try<Nothing> setns = ns::setns(taskPid.get(), ns);
+        if (setns.isError()) {
+          // This effectively aborts the check.
+          LOG(FATAL) << "Failed to enter the " << ns << " namespace of "
+                     << "task (pid: '" << taskPid.get() << "'): "
+                     << setns.error();
+        }
+
+        VLOG(1) << "Entered the " << ns << " namespace of "
+                << "task (pid: '" << taskPid.get() << "') successfully";
+      }
+    }
+
+    return func();
+  });
+}
+#endif
+
+
+class CheckerProcess : public ProtobufProcess<CheckerProcess>
+{
+public:
+  CheckerProcess(
+      const CheckInfo& _check,
+      const lambda::function<void(const CheckStatusInfo&)>& _callback,
+      const TaskID& _taskId,
+      const Option<pid_t>& _taskPid,
+      const std::vector<std::string>& _namespaces);
+
+  virtual ~CheckerProcess() {}
+
+protected:
+  void initialize() override;
+  void finalize() override;
+
+private:
+  void performCheck();
+  void scheduleNext(const Duration& duration);
+  void processCheckResult(
+      const Stopwatch& stopwatch,
+      const CheckStatusInfo& result);
+
+  process::Future<int> commandCheck();
+  void processCommandCheckResult(
+      const Stopwatch& stopwatch,
+      const process::Future<int>& result);
+
+  process::Future<int> httpCheck();
+  process::Future<int> _httpCheck(
+      const std::tuple<
+          process::Future<Option<int>>,
+          process::Future<std::string>,
+          process::Future<std::string>>& t);
+  void processHttpCheckResult(
+      const Stopwatch& stopwatch,
+      const process::Future<int>& result);
+
+  const CheckInfo check;
+  Duration checkDelay;
+  Duration checkInterval;
+  Duration checkTimeout;
+
+  const lambda::function<void(const CheckStatusInfo&)> updateCallback;
+  const TaskID taskId;
+  const Option<pid_t> taskPid;
+  const std::vector<std::string> namespaces;
+  Option<lambda::function<pid_t(const lambda::function<int()>&)>> clone;
+
+  CheckStatusInfo previousCheckStatus;
+};
+
+
+Try<Owned<Checker>> Checker::create(
+    const CheckInfo& check,
+    const lambda::function<void(const CheckStatusInfo&)>& callback,
+    const TaskID& taskId,
+    const Option<pid_t>& taskPid,
+    const vector<string>& namespaces)
+{
+  // Validate the `CheckInfo` protobuf.
+  Option<Error> error = validation::checkInfo(check);
+  if (error.isSome()) {
+    return error.get();
+  }
+
+  Owned<CheckerProcess> process(new CheckerProcess(
+      check,
+      callback,
+      taskId,
+      taskPid,
+      namespaces));
+
+  return Owned<Checker>(new Checker(process));
+}
+
+
+Checker::Checker(Owned<CheckerProcess> _process)
+  : process(_process)
+{
+  spawn(CHECK_NOTNULL(process.get()));
+}
+
+
+Checker::~Checker()
+{
+  terminate(process.get());
+  wait(process.get());
+}
+
+
+void Checker::stop()
+{
+  terminate(process.get(), true);
+}
+
+
+CheckerProcess::CheckerProcess(
+    const CheckInfo& _check,
+    const lambda::function<void(const CheckStatusInfo&)>& _callback,
+    const TaskID& _taskId,
+    const Option<pid_t>& _taskPid,
+    const vector<string>& _namespaces)
+  : ProcessBase(process::ID::generate("checker")),
+    check(_check),
+    updateCallback(_callback),
+    taskId(_taskId),
+    taskPid(_taskPid),
+    namespaces(_namespaces)
+{
+  Try<Duration> create = Duration::create(check.delay_seconds());
+  CHECK_SOME(create);
+  checkDelay = create.get();
+
+  create = Duration::create(check.interval_seconds());
+  CHECK_SOME(create);
+  checkInterval = create.get();
+
+  // Zero value means infinite timeout.
+  create = Duration::create(check.timeout_seconds());
+  CHECK_SOME(create);
+  checkTimeout =
+    (create.get() > Duration::zero()) ? create.get() : Duration::max();
+
+  // The first check update should be sent only when a check succeeds,
+  // hence we should deduplicate against a corresponding "empty" result.
+  previousCheckStatus.set_type(check.type());
+  switch (check.type()) {
+    case CheckInfo::COMMAND: {
+      previousCheckStatus.mutable_command();
+      break;
+    }
+
+    case CheckInfo::HTTP: {
+      previousCheckStatus.mutable_http();
+      break;
+    }
+
+    case CheckInfo::UNKNOWN: {
+      LOG(FATAL) << "Received UNKNOWN check type";
+      break;
+    }
+  }
+
+#ifdef __linux__
+  if (!namespaces.empty()) {
+    clone = lambda::bind(&cloneWithSetns, lambda::_1, taskPid, namespaces);
+  }
+#endif
+}
+
+
+void CheckerProcess::initialize()
+{
+  VLOG(1) << "Check configuration for task " << taskId << ":"
+          << " '" << jsonify(JSON::Protobuf(check)) << "'";
+
+  scheduleNext(checkDelay);
+}
+
+
+void CheckerProcess::finalize()
+{
+  LOG(INFO) << "Checking for task " << taskId << " stopped";
+}
+
+
+void CheckerProcess::performCheck()
+{
+  Stopwatch stopwatch;
+  stopwatch.start();
+
+  switch (check.type()) {
+    case CheckInfo::COMMAND: {
+      commandCheck().onAny(defer(
+          self(),
+          &Self::processCommandCheckResult, stopwatch, lambda::_1));
+      break;
+    }
+
+    case CheckInfo::HTTP: {
+      httpCheck().onAny(defer(
+          self(),
+          &Self::processHttpCheckResult, stopwatch, lambda::_1));
+      break;
+    }
+
+    case CheckInfo::UNKNOWN: {
+      LOG(FATAL) << "Received UNKNOWN check type";
+      break;
+    }
+  }
+}
+
+
+void CheckerProcess::scheduleNext(const Duration& duration)
+{
+  VLOG(1) << "Scheduling check for task " << taskId << " in " << duration;
+
+  delay(duration, self(), &Self::performCheck);
+}
+
+
+void CheckerProcess::processCheckResult(
+    const Stopwatch& stopwatch,
+    const CheckStatusInfo& result)
+{
+  VLOG(1) << "Performed " << check.type() << " check for task " << taskId
+          << " in " << stopwatch.elapsed();
+
+  // Trigger the callback if check info changes.
+  if (result != previousCheckStatus) {
+    // We assume this is a local send, i.e., the checker library is not used
+    // in a binary external to the executor and hence can not exit before
+    // the data is sent to the executor.
+    updateCallback(result);
+    previousCheckStatus = result;
+  }
+
+  scheduleNext(checkInterval);
+}
+
+
+Future<int> CheckerProcess::commandCheck()
+{
+  CHECK_EQ(CheckInfo::COMMAND, check.type());
+  CHECK(check.has_command());
+
+  const CommandInfo& command = check.command().command();
+
+  map<string, string> environment = os::environment();
+
+  foreach (const Environment::Variable& variable,
+           command.environment().variables()) {
+    environment[variable.name()] = variable.value();
+  }
+
+  // Launch the subprocess.
+  Try<Subprocess> s = Error("Not launched");
+
+  if (command.shell()) {
+    // Use the shell variant.
+    VLOG(1) << "Launching command check '" << command.value() << "'"
+            << " for task " << taskId;
+
+    s = process::subprocess(
+        command.value(),
+        Subprocess::PATH("/dev/null"),
+        Subprocess::FD(STDERR_FILENO),
+        Subprocess::FD(STDERR_FILENO),
+        environment,
+        clone);
+  } else {
+    // Use the exec variant.
+    vector<string> argv(
+        std::begin(command.arguments()), std::end(command.arguments()));
+
+    VLOG(1) << "Launching command check [" << command.value() << ", "
+            << strings::join(", ", argv) << "] for task " << taskId;
+
+    s = process::subprocess(
+        command.value(),
+        argv,
+        Subprocess::PATH("/dev/null"),
+        Subprocess::FD(STDERR_FILENO),
+        Subprocess::FD(STDERR_FILENO),
+        nullptr,
+        environment,
+        clone);
+  }
+
+  if (s.isError()) {
+    return Failure("Failed to create subprocess: " + s.error());
+  }
+
+  // TODO(alexr): Use lambda named captures for
+  // these cached values once it is available.
+  const pid_t commandPid = s->pid();
+  const Duration timeout = checkTimeout;
+  const TaskID _taskId = taskId;
+
+  return s->status()
+    .after(
+        timeout,
+        [timeout, commandPid, _taskId](Future<Option<int>> future) {
+      future.discard();
+
+      if (commandPid != -1) {
+        // Cleanup the external command process.
+        VLOG(1) << "Killing the command check process " << commandPid
+                << " for task " << _taskId;
+
+        os::killtree(commandPid, SIGKILL);
+      }
+
+      return Failure(
+          "Command timed out after " + stringify(timeout) + "; aborting");
+    })
+    .then([](const Option<int>& exitCode) -> Future<int> {
+      if (exitCode.isNone()) {
+        return Failure("Failed to reap the command process");
+      }
+
+      return exitCode.get();
+    });
+}
+
+
+void CheckerProcess::processCommandCheckResult(
+    const Stopwatch& stopwatch,
+    const Future<int>& result)
+{
+  CheckStatusInfo checkStatusInfo;
+  checkStatusInfo.set_type(check.type());
+
+  if (result.isReady() && WIFEXITED(result.get())) {
+    const int exitCode = WEXITSTATUS(result.get());
+    VLOG(1) << check.type() << " check for task "
+            << taskId << " returned " << exitCode;
+
+    checkStatusInfo.mutable_command()->set_exit_code(
+        static_cast<int32_t>(exitCode));
+  } else {
+    // Check's status is currently not available, which may indicate a change
+    // that should be reported as an empty `CheckStatusInfo.Command` message.
+    LOG(WARNING) << "Check for task " << taskId << " failed: "
+                 << (result.isFailed() ? result.failure() : "discarded");
+
+    checkStatusInfo.mutable_command();
+  }
+
+  processCheckResult(stopwatch, checkStatusInfo);
+}
+
+
+Future<int> CheckerProcess::httpCheck()
+{
+  CHECK_EQ(CheckInfo::HTTP, check.type());
+  CHECK(check.has_http());
+
+  const CheckInfo::Http& http = check.http();
+
+  const string scheme = DEFAULT_HTTP_SCHEME;
+  const string path = http.has_path() ? http.path() : "";
+  const string url = scheme + "://" + DEFAULT_DOMAIN + ":" +
+                     stringify(http.port()) + path;
+
+  VLOG(1) << "Launching HTTP check '" << url << "' for task " << taskId;
+
+  const vector<string> argv = {
+    HTTP_CHECK_COMMAND,
+    "-s",                 // Don't show progress meter or error messages.
+    "-S",                 // Makes curl show an error message if it fails.
+    "-L",                 // Follows HTTP 3xx redirects.
+    "-k",                 // Ignores SSL validation when scheme is https.
+    "-w", "%{http_code}", // Displays HTTP response code on stdout.
+    "-o", "/dev/null",    // Ignores output.
+    url
+  };
+
+  // TODO(alexr): Consider launching the helper binary once per task lifetime,
+  // see MESOS-6766.
+  Try<Subprocess> s = process::subprocess(
+      HTTP_CHECK_COMMAND,
+      argv,
+      Subprocess::PATH("/dev/null"),
+      Subprocess::PIPE(),
+      Subprocess::PIPE(),
+      nullptr,
+      None(),
+      clone);
+
+  if (s.isError()) {
+    return Failure(
+        "Failed to create the " + string(HTTP_CHECK_COMMAND) +
+        " subprocess: " + s.error());
+  }
+
+  // TODO(alexr): Use lambda named captures for
+  // these cached values once it is available.
+  const pid_t curlPid = s->pid();
+  const Duration timeout = checkTimeout;
+  const TaskID _taskId = taskId;
+
+  return await(
+      s->status(),
+      process::io::read(s->out().get()),
+      process::io::read(s->err().get()))
+    .after(
+        timeout,
+        [timeout, curlPid, _taskId](Future<tuple<Future<Option<int>>,
+                                                 Future<string>,
+                                                 Future<string>>> future) {
+      future.discard();
+
+      if (curlPid != -1) {
+        // Cleanup the HTTP_CHECK_COMMAND process.
+        VLOG(1) << "Killing the HTTP check process " << curlPid
+                << " for task " << _taskId;
+
+        os::killtree(curlPid, SIGKILL);
+      }
+
+      return Failure(
+          string(HTTP_CHECK_COMMAND) + " timed out after " +
+          stringify(timeout) + "; aborting");
+    })
+    .then(defer(self(), &Self::_httpCheck, lambda::_1));
+}
+
+
+Future<int> CheckerProcess::_httpCheck(
+    const tuple<
+        Future<Option<int>>,
+        Future<string>,
+        Future<string>>& t)
+{
+  Future<Option<int>> status = std::get<0>(t);
+  if (!status.isReady()) {
+    return Failure(
+        "Failed to get the exit status of the " + string(HTTP_CHECK_COMMAND) +
+        " process: " + (status.isFailed() ? status.failure() : "discarded"));
+  }
+
+  if (status->isNone()) {
+    return Failure(
+        "Failed to reap the " + string(HTTP_CHECK_COMMAND) + " process");
+  }
+
+  int exitCode = status->get();
+  if (exitCode != 0) {
+    Future<string> error = std::get<2>(t);
+    if (!error.isReady()) {
+      return Failure(
+          string(HTTP_CHECK_COMMAND) + " returned " +
+          WSTRINGIFY(exitCode) + "; reading stderr failed: " +
+          (error.isFailed() ? error.failure() : "discarded"));
+    }
+
+    return Failure(
+        string(HTTP_CHECK_COMMAND) + " returned " +
+        WSTRINGIFY(exitCode) + ": " + error.get());
+  }
+
+  Future<string> output = std::get<1>(t);
+  if (!output.isReady()) {
+    return Failure(
+        "Failed to read stdout from " + string(HTTP_CHECK_COMMAND) + ": " +
+        (output.isFailed() ? output.failure() : "discarded"));
+  }
+
+  // Parse the output and get the HTTP status code.
+  Try<int> statusCode = numify<int>(output.get());
+  if (statusCode.isError()) {
+    return Failure(
+        "Unexpected output from " + string(HTTP_CHECK_COMMAND) + ": " +
+        output.get());
+  }
+
+  return statusCode.get();
+}
+
+
+void CheckerProcess::processHttpCheckResult(
+    const Stopwatch& stopwatch,
+    const process::Future<int>& result)
+{
+  CheckStatusInfo checkStatusInfo;
+  checkStatusInfo.set_type(check.type());
+
+  if (result.isReady()) {
+    VLOG(1) << check.type() << " check for task "
+            << taskId << " returned " << result.get();
+
+    checkStatusInfo.mutable_http()->set_status_code(
+        static_cast<uint32_t>(result.get()));
+  } else {
+    // Check's status is currently not available, which may indicate a change
+    // that should be reported as an empty `CheckStatusInfo.Http` message.
+    LOG(WARNING) << "Check for task " << taskId << " failed: "
+                 << (result.isFailed() ? result.failure() : "discarded");
+
+    checkStatusInfo.mutable_http();
+  }
+
+  processCheckResult(stopwatch, checkStatusInfo);
+}
+
 namespace validation {
 
 Option<Error> checkInfo(const CheckInfo& checkInfo)

http://git-wip-us.apache.org/repos/asf/mesos/blob/f042bf60/src/checks/checker.hpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.hpp b/src/checks/checker.hpp
index dc293f3..e8af316 100644
--- a/src/checks/checker.hpp
+++ b/src/checks/checker.hpp
@@ -17,15 +17,68 @@
 #ifndef __CHECKER_HPP__
 #define __CHECKER_HPP__
 
+#include <string>
+#include <vector>
+
 #include <mesos/mesos.hpp>
 
+#include <process/owned.hpp>
+
 #include <stout/error.hpp>
+#include <stout/lambda.hpp>
 #include <stout/option.hpp>
+#include <stout/try.hpp>
 
 namespace mesos {
 namespace internal {
 namespace checks {
 
+// Forward declarations.
+class CheckerProcess;
+
+class Checker
+{
+public:
+  /**
+   * Attempts to create a `Checker` object. In case of success, checking
+   * starts immediately after initialization.
+   *
+   * @param check The protobuf message definition of a check.
+   * @param callback A callback `Checker` uses to send check status updates
+   *     to its owner (usually an executor).
+   * @param taskId The TaskID of the target task.
+   * @param taskPid The target task's pid used to enter the specified
+   *     namespaces.
+   * @param namespaces The namespaces to enter prior to performing the check.
+   * @return A `Checker` object or an error if `create` fails.
+   *
+   * @todo A better approach would be to return a stream of updates, e.g.,
+   * `process::Stream<CheckStatusInfo>` rather than invoking a callback.
+   */
+  static Try<process::Owned<Checker>> create(
+      const CheckInfo& checkInfo,
+      const lambda::function<void(const CheckStatusInfo&)>& callback,
+      const TaskID& taskId,
+      const Option<pid_t>& taskPid,
+      const std::vector<std::string>& namespaces);
+
+  ~Checker();
+
+  // Not copyable, not assignable.
+  Checker(const Checker&) = delete;
+  Checker& operator=(const Checker&) = delete;
+
+  /**
+   * Immediately stops checking. Any in-flight checks are dropped.
+   */
+  void stop();
+
+private:
+  explicit Checker(process::Owned<CheckerProcess> process);
+
+  process::Owned<CheckerProcess> process;
+};
+
 namespace validation {
 
 // TODO(alexr): A better place for these functions would be something like

http://git-wip-us.apache.org/repos/asf/mesos/blob/f042bf60/src/checks/health_checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/health_checker.cpp b/src/checks/health_checker.cpp
index 6c97369..7efefe9 100644
--- a/src/checks/health_checker.cpp
+++ b/src/checks/health_checker.cpp
@@ -92,7 +92,7 @@ static const string DEFAULT_DOMAIN = "127.0.0.1";
 #ifdef __linux__
 // TODO(alexr): Instead of defining this ad-hoc clone function, provide a
 // general solution for entring namespace in child processes, see MESOS-6184.
-pid_t cloneWithSetns(
+static pid_t cloneWithSetns(
     const lambda::function<int()>& func,
     const Option<pid_t>& taskPid,
     const vector<string>& namespaces)

http://git-wip-us.apache.org/repos/asf/mesos/blob/f042bf60/src/checks/health_checker.hpp
----------------------------------------------------------------------
diff --git a/src/checks/health_checker.hpp b/src/checks/health_checker.hpp
index f1f2834..44df544 100644
--- a/src/checks/health_checker.hpp
+++ b/src/checks/health_checker.hpp
@@ -64,6 +64,9 @@ public:
    *
    * @todo A better approach would be to return a stream of updates, e.g.,
    * `process::Stream<TaskHealthStatus>` rather than invoking a callback.
+   *
+   * @todo Consider leveraging `checks::Checker` for checking functionality.
+   * This class will then focus on interpreting and acting on the result.
    */
   static Try<process::Owned<HealthChecker>> create(
       const HealthCheck& check,