You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/03/24 04:36:09 UTC

[3/6] mesos git commit: Added support for COMMAND health checks to default executor.

Added support for COMMAND health checks to default executor.

Review: https://reviews.apache.org/r/55901/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4bbfaebb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4bbfaebb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4bbfaebb

Branch: refs/heads/master
Commit: 4bbfaebb793b3f08e9acf1f358e881da02b4a068
Parents: b97d682
Author: Gast�n Kleiman <ga...@mesosphere.io>
Authored: Fri Mar 24 00:48:50 2017 +0100
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Fri Mar 24 00:56:44 2017 +0100

----------------------------------------------------------------------
 src/checks/health_checker.cpp     | 256 ++++++++++++++++++++++++++++++++-
 src/checks/health_checker.hpp     |  74 +++++++++-
 src/launcher/default_executor.cpp |  12 +-
 src/tests/health_check_tests.cpp  | 113 +++++++++++++++
 4 files changed, 445 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4bbfaebb/src/checks/health_checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/health_checker.cpp b/src/checks/health_checker.cpp
index 236d07b..3a7de78 100644
--- a/src/checks/health_checker.cpp
+++ b/src/checks/health_checker.cpp
@@ -29,6 +29,8 @@
 
 #include <mesos/mesos.hpp>
 
+#include <mesos/agent/agent.hpp>
+
 #include <process/collect.hpp>
 #include <process/delay.hpp>
 #include <process/dispatch.hpp>
@@ -50,9 +52,12 @@
 #include <stout/os/constants.hpp>
 #include <stout/os/killtree.hpp>
 
+#include "common/http.hpp"
 #include "common/status_utils.hpp"
 #include "common/validation.hpp"
 
+#include "internal/evolve.hpp"
+
 #ifdef __linux__
 #include "linux/ns.hpp"
 #endif
@@ -66,6 +71,9 @@ using process::Owned;
 using process::Subprocess;
 using process::Time;
 
+using process::http::Connection;
+using process::http::Response;
+
 using std::map;
 using std::string;
 using std::tuple;
@@ -140,7 +148,39 @@ Try<Owned<HealthChecker>> HealthChecker::create(
       callback,
       taskId,
       taskPid,
-      namespaces));
+      namespaces,
+      None(),
+      None(),
+      false));
+
+  return Owned<HealthChecker>(new HealthChecker(process));
+}
+
+
+Try<Owned<HealthChecker>> HealthChecker::create(
+    const HealthCheck& check,
+    const string& launcherDir,
+    const lambda::function<void(const TaskHealthStatus&)>& callback,
+    const TaskID& taskId,
+    const ContainerID& taskContainerId,
+    const process::http::URL& agentURL)
+{
+  // Validate the 'HealthCheck' protobuf.
+  Option<Error> error = validation::healthCheck(check);
+  if (error.isSome()) {
+    return error.get();
+  }
+
+  Owned<HealthCheckerProcess> process(new HealthCheckerProcess(
+      check,
+      launcherDir,
+      callback,
+      taskId,
+      None(),
+      {},
+      taskContainerId,
+      agentURL,
+      true));
 
   return Owned<HealthChecker>(new HealthChecker(process));
 }
@@ -175,7 +215,10 @@ HealthCheckerProcess::HealthCheckerProcess(
     const lambda::function<void(const TaskHealthStatus&)>& _callback,
     const TaskID& _taskId,
     const Option<pid_t>& _taskPid,
-    const vector<string>& _namespaces)
+    const vector<string>& _namespaces,
+    const Option<ContainerID>& _taskContainerId,
+    const Option<process::http::URL>& _agentURL,
+    bool _commandCheckViaAgent)
   : ProcessBase(process::ID::generate("health-checker")),
     check(_check),
     launcherDir(_launcherDir),
@@ -183,6 +226,9 @@ HealthCheckerProcess::HealthCheckerProcess(
     taskId(_taskId),
     taskPid(_taskPid),
     namespaces(_namespaces),
+    taskContainerId(_taskContainerId),
+    agentURL(_agentURL),
+    commandCheckViaAgent(_commandCheckViaAgent),
     consecutiveFailures(0),
     initializing(true)
 {
@@ -285,7 +331,8 @@ void HealthCheckerProcess::performSingleCheck()
 
   switch (check.type()) {
     case HealthCheck::COMMAND: {
-      checkResult = commandHealthCheck();
+      checkResult = commandCheckViaAgent ? nestedCommandHealthCheck()
+                                         : commandHealthCheck();
       break;
     }
 
@@ -417,6 +464,209 @@ Future<Nothing> HealthCheckerProcess::commandHealthCheck()
 }
 
 
+Future<Nothing> HealthCheckerProcess::nestedCommandHealthCheck()
+{
+  CHECK_EQ(HealthCheck::COMMAND, check.type());
+  CHECK_SOME(taskContainerId);
+  CHECK(check.has_command());
+  CHECK_SOME(agentURL);
+
+  VLOG(1) << "Launching command health check of task " << stringify(taskId);
+
+  return process::http::connect(agentURL.get())
+    .repair([](const Future<Connection>& future) {
+      return Failure(
+          "Unable to establish connection with the agent: " + future.failure());
+    })
+    .then(defer(self(), &Self::_nestedCommandHealthCheck, lambda::_1));
+}
+
+
+Future<Nothing> HealthCheckerProcess::_nestedCommandHealthCheck(
+    Connection connection)
+{
+  // TODO(gkleiman): Don't reuse the `ContainerID`, it is not safe.
+  ContainerID checkContainerId;
+  checkContainerId.set_value(taskContainerId.get().value() + "-health-check");
+  checkContainerId.mutable_parent()->CopyFrom(taskContainerId.get());
+
+  CommandInfo command(check.command());
+
+  agent::Call call;
+  call.set_type(agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
+
+  agent::Call::LaunchNestedContainerSession* launch =
+    call.mutable_launch_nested_container_session();
+
+  launch->mutable_container_id()->CopyFrom(checkContainerId);
+  launch->mutable_command()->CopyFrom(command);
+
+  process::http::Request request;
+  request.method = "POST";
+  request.url = agentURL.get();
+  request.body = serialize(ContentType::PROTOBUF, evolve(call));
+  request.headers = {{"Accept", stringify(ContentType::RECORDIO)},
+                     {"Message-Accept", stringify(ContentType::PROTOBUF)},
+                     {"Content-Type", stringify(ContentType::PROTOBUF)}};
+
+  // `LAUNCH_NESTED_CONTAINER_SESSION` returns a streamed response with
+  // the output of the container. The agent will close the stream once
+  // the container has exited, or kill the container if the client
+  // closes the connection.
+  //
+  // We're calling `Connection::send` with `streamed = false`, so that
+  // it returns an HTTP response of type 'BODY' once the entire response
+  // is received.
+  //
+  // This means that this future will not be completed until after the
+  // health check command has finished or the connection has been
+  // closed.
+  return connection.send(request, false)
+    .after(checkTimeout,
+           defer(self(),
+                 &Self::nestedCommandHealthCheckTimedOut,
+                 checkContainerId,
+                 connection,
+                 lambda::_1))
+    .then(defer(self(),
+                &Self::__nestedCommandHealthCheck,
+                checkContainerId,
+                lambda::_1));
+}
+
+
+Future<Nothing> HealthCheckerProcess::__nestedCommandHealthCheck(
+    const ContainerID& checkContainerId,
+    const Response& launchResponse)
+{
+  if (launchResponse.code != process::http::Status::OK) {
+    return Failure(
+        "Received '" + launchResponse.status + "' (" + launchResponse.body +
+        ") while launching a command health check of task '" +
+        stringify(taskId) + "'");
+  }
+
+  // We need to make a copy so that the lambdas can capture it.
+  const TaskID taskId_ = taskId;
+
+  return waitNestedContainer(checkContainerId)
+    .repair([taskId_](const Future<Option<int>>& future) {
+      return Failure(
+          "Unable to get the exit code of command health check of task '" +
+          stringify(taskId_) + "': " + future.failure());
+    })
+    .then([taskId_](const Option<int> status) -> Future<Nothing> {
+      if (status.isNone()) {
+        return Failure(
+            "Unable to get the exit code of command health check of task '" +
+            stringify(taskId_) + "'");
+      } else if (status.get() != 0) {
+        return Failure(
+            "Command health check of task '" + stringify(taskId_) +
+            "' returned " + WSTRINGIFY(status.get()));
+      } else {
+        return Nothing();
+      }
+    });
+}
+
+
+Future<Response>
+HealthCheckerProcess::nestedCommandHealthCheckTimedOut(
+    const ContainerID& checkContainerId,
+    Connection connection,
+    Future<Response> future)
+{
+  future.discard();
+
+  // Closing the connection will make the agent kill the container.
+  connection.disconnect();
+
+  const Failure failure = Failure(
+      "Command health check of task '" + stringify(taskId) +
+      "' has timed out after " + stringify(checkTimeout));
+
+  // We need to make a copy so that the lambda can capture it.
+  const TaskID taskId_ = taskId;
+
+  // If the health check delay interval is zero, we'll try to perform
+  // another health check right after we finish processing the current
+  // timeout.
+  //
+  // All the containers created for the health checks reuse the same
+  // `ContainerID`. In order to prevent conflicts, the future returned
+  // by this method should only be completed once we're sure that the
+  // container has been cleaned up.
+  return waitNestedContainer(checkContainerId)
+    .repair([failure, taskId_](const Future<Option<int>>& waitFuture) {
+      // We assume that once `WaitNestedContainer` returns, irrespective of
+      // whether the response contains a failure, the container will be in a
+      // terminal state, so starting a new health check will not lead to a
+      // transient failure.
+      //
+      // This means that we don't need to retry the `WaitNestedContainer`
+      // call.
+      LOG(WARNING) << "Unable to get the exit code of command health check of "
+                   << "task '" << stringify(taskId_)
+                   << "': " << waitFuture.failure();
+
+      return Future<Option<int>>(failure);
+    })
+    .then([failure](const Option<int>&) {
+      return Future<Response>(failure);
+    });
+}
+
+
+Future<Option<int>> HealthCheckerProcess::waitNestedContainer(
+    const ContainerID& containerId)
+{
+  agent::Call call;
+  call.set_type(agent::Call::WAIT_NESTED_CONTAINER);
+
+  agent::Call::WaitNestedContainer* containerWait =
+    call.mutable_wait_nested_container();
+
+  containerWait->mutable_container_id()->CopyFrom(containerId);
+
+  process::http::Request request;
+  request.method = "POST";
+  request.url = agentURL.get();
+  request.body = serialize(ContentType::PROTOBUF, evolve(call));
+  request.headers = {{"Accept", stringify(ContentType::PROTOBUF)},
+                     {"Content-Type", stringify(ContentType::PROTOBUF)}};
+
+  return process::http::request(request, false)
+    .then(defer(self(),
+                &Self::_waitNestedContainer,
+                containerId,
+                lambda::_1));
+}
+
+
+Future<Option<int>> HealthCheckerProcess::_waitNestedContainer(
+    const ContainerID& containerId,
+    const Response& httpResponse)
+{
+  if (httpResponse.code != process::http::Status::OK) {
+    return Failure(
+        "Received '" + httpResponse.status + "' (" + httpResponse.body +
+        ") while waiting on health check of task " + stringify(taskId));
+  }
+
+  Try<agent::Response> response =
+    deserialize<agent::Response>(ContentType::PROTOBUF, httpResponse.body);
+  CHECK_SOME(response);
+
+  CHECK(response->has_wait_nested_container());
+
+  return (
+      response->wait_nested_container().has_exit_status()
+        ? Option<int>(response->wait_nested_container().exit_status())
+        : Option<int>::none());
+}
+
+
 Future<Nothing> HealthCheckerProcess::httpHealthCheck()
 {
   CHECK_EQ(HealthCheck::HTTP, check.type());

http://git-wip-us.apache.org/repos/asf/mesos/blob/4bbfaebb/src/checks/health_checker.hpp
----------------------------------------------------------------------
diff --git a/src/checks/health_checker.hpp b/src/checks/health_checker.hpp
index 44df544..a7307ac 100644
--- a/src/checks/health_checker.hpp
+++ b/src/checks/health_checker.hpp
@@ -51,6 +51,9 @@ public:
    * Attempts to create a `HealthChecker` object. In case of success, health
    * checking starts immediately after initialization.
    *
+   * If the check is a command health check, the checker will fork a process,
+   * enter the task's namespaces, and execute the commmand.
+   *
    * @param check The protobuf message definition of health check.
    * @param launcherDir A directory where Mesos helper binaries are located.
    * @param callback A callback HealthChecker uses to send health status
@@ -76,6 +79,35 @@ public:
       const Option<pid_t>& taskPid,
       const std::vector<std::string>& namespaces);
 
+  /**
+   * Attempts to create a `HealthChecker` object. In case of success, health
+   * checking starts immediately after initialization.
+   *
+   * If the check is a command health check, the checker will delegate the
+   * execution of the check to the Mesos agent via the
+   * `LaunchNestedContainerSession` API call.
+   *
+   * @param check The protobuf message definition of health check.
+   * @param launcherDir A directory where Mesos helper binaries are located.
+   * @param callback A callback HealthChecker uses to send health status
+   *     updates to its owner (usually an executor).
+   * @param taskId The TaskID of the target task.
+   * @param taskContainerId The ContainerID of the target task.
+   * @param agentURL The URL of the agent.
+   * @return A `HealthChecker` object or an error if `create` fails.
+   *
+   * @todo A better approach would be to return a stream of updates, e.g.,
+   * `process::Stream<TaskHealthStatus>` rather than invoking a callback.
+   */
+  static Try<process::Owned<HealthChecker>> create(
+      const HealthCheck& check,
+      const std::string& launcherDir,
+      const lambda::function<void(const TaskHealthStatus&)>& callback,
+      const TaskID& taskId,
+      const ContainerID& taskContainerId,
+      const process::http::URL& agentURL);
+
+
   ~HealthChecker();
 
   /**
@@ -99,7 +131,10 @@ public:
       const lambda::function<void(const TaskHealthStatus&)>& _callback,
       const TaskID& _taskId,
       const Option<pid_t>& _taskPid,
-      const std::vector<std::string>& _namespaces);
+      const std::vector<std::string>& _namespaces,
+      const Option<ContainerID>& _taskContainerId,
+      const Option<process::http::URL>& _agentURL,
+      bool _commandCheckViaAgent);
 
   virtual ~HealthCheckerProcess() {}
 
@@ -117,6 +152,39 @@ private:
 
   process::Future<Nothing> commandHealthCheck();
 
+  process::Future<Nothing> nestedCommandHealthCheck();
+
+  process::Future<Nothing> _nestedCommandHealthCheck(
+      process::http::Connection connection);
+
+  process::Future<Nothing> __nestedCommandHealthCheck(
+      const ContainerID& checkContainerId,
+      const process::http::Response& launchResponse);
+
+  process::Future<process::http::Response>
+  nestedCommandHealthCheckTimedOut(
+      const ContainerID& checkContainerId,
+      process::http::Connection connection,
+      process::Future<process::http::Response> future);
+
+  /**
+   * Waits for a container to be terminated.
+   *
+   * Waits for a container to be terminated via the Agent's
+   * `WaitNestedContainer` API call.
+   *
+   * @param containerID The `ContainerID` of the container that we want
+   * to wait for.
+   *
+   * @return The exit status as returned by the Agent.
+   */
+  process::Future<Option<int>> waitNestedContainer(
+      const ContainerID& containerId);
+
+  process::Future<Option<int>> _waitNestedContainer(
+      const ContainerID& containerId,
+      const process::http::Response& httpResponse);
+
   process::Future<Nothing> httpHealthCheck();
 
   process::Future<Nothing> _httpHealthCheck(
@@ -148,6 +216,10 @@ private:
   const TaskID taskId;
   const Option<pid_t> taskPid;
   const std::vector<std::string> namespaces;
+  const Option<ContainerID> taskContainerId;
+  const Option<process::http::URL> agentURL;
+  const bool commandCheckViaAgent;
+
   Option<lambda::function<pid_t(const lambda::function<int()>&)>> clone;
 
   uint32_t consecutiveFailures;

http://git-wip-us.apache.org/repos/asf/mesos/blob/4bbfaebb/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index f19bba9..58efb4c 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -143,6 +143,8 @@ public:
     connectionId = UUID::random();
 
     doReliableRegistration();
+
+    // TODO(gkleiman): Resume (health) checks.
   }
 
   void disconnected()
@@ -160,6 +162,8 @@ public:
         container->waiting = None();
       }
     }
+
+    // TODO(gkleiman): Stop (health) checks.
   }
 
   void received(const Event& event)
@@ -502,18 +506,14 @@ protected:
       }
 
       if (task.has_health_check()) {
-        // TODO(anand): Add support for command health checks.
-        CHECK_NE(HealthCheck::COMMAND, task.health_check().type())
-          << "Command health checks are not supported yet";
-
         Try<Owned<checks::HealthChecker>> healthChecker =
           checks::HealthChecker::create(
               task.health_check(),
               launcherDirectory,
               defer(self(), &Self::taskHealthUpdated, lambda::_1),
               taskId,
-              None(),
-              vector<string>());
+              containerId,
+              agent);
 
         if (healthChecker.isError()) {
           // TODO(anand): Should we send a TASK_FAILED instead?

http://git-wip-us.apache.org/repos/asf/mesos/blob/4bbfaebb/src/tests/health_check_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/health_check_tests.cpp b/src/tests/health_check_tests.cpp
index a5b35cf..211f8b8 100644
--- a/src/tests/health_check_tests.cpp
+++ b/src/tests/health_check_tests.cpp
@@ -35,6 +35,7 @@
 #include "tests/health_check_test_helper.hpp"
 #include "tests/mesos.hpp"
 #include "tests/mock_docker.hpp"
+#include "tests/resources_utils.hpp"
 #include "tests/utils.hpp"
 
 #ifdef __linux__
@@ -2131,6 +2132,118 @@ TEST_F(HealthCheckTest, ROOT_DOCKER_DockerHealthyTaskViaTCP)
   }
 }
 
+
+TEST_F_TEMP_DISABLED_ON_WINDOWS(
+    HealthCheckTest, DefaultExecutorCommandHealthCheck)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Disable AuthN on the agent.
+  slave::Flags flags = CreateSlaveFlags();
+  flags.authenticate_http_readwrite = false;
+
+  Fetcher fetcher;
+
+  // We have to explicitly create a `Containerizer` in non-local mode,
+  // because `LaunchNestedContainerSession` (used by command health
+  // checks) tries to start a IO switchboard, which doesn't work in
+  // local mode yet.
+  Try<MesosContainerizer*> _containerizer =
+    MesosContainerizer::create(flags, false, &fetcher);
+
+  ASSERT_SOME(_containerizer);
+
+  Owned<slave::Containerizer> containerizer(_containerizer.get());
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> agent =
+    StartSlave(detector.get(), containerizer.get(), flags);
+  ASSERT_SOME(agent);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+    &sched, DEFAULT_FRAMEWORK_INFO, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  Future<FrameworkID> frameworkId;
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .WillOnce(FutureArg<1>(&frameworkId));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0u, offers.get().size());
+
+  Future<TaskStatus> statusRunning;
+  Future<TaskStatus> statusHealthy;
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&statusRunning))
+    .WillOnce(FutureArg<1>(&statusHealthy));
+
+  TaskInfo task = createTask(offers->front(), "sleep 120");
+
+  HealthCheck healthCheck;
+
+  healthCheck.set_type(HealthCheck::COMMAND);
+  healthCheck.mutable_command()->set_value("exit $STATUS");
+  healthCheck.set_delay_seconds(0);
+  healthCheck.set_interval_seconds(0);
+  healthCheck.set_grace_period_seconds(0);
+
+  Environment::Variable* variable = healthCheck.mutable_command()->
+    mutable_environment()->mutable_variables()->Add();
+  variable->set_name("STATUS");
+  variable->set_value("0");
+
+  task.mutable_health_check()->CopyFrom(healthCheck);
+
+  Resources executorResources =
+    allocatedResources(Resources::parse("cpus:0.1;mem:32;disk:32").get(), "*");
+
+  task.mutable_resources()->CopyFrom(task.resources() - executorResources);
+
+  TaskGroupInfo taskGroup;
+  taskGroup.add_tasks()->CopyFrom(task);
+
+  ExecutorInfo executor;
+  executor.mutable_executor_id()->set_value("default");
+  executor.set_type(ExecutorInfo::DEFAULT);
+  executor.mutable_framework_id()->CopyFrom(frameworkId.get());
+  executor.mutable_resources()->CopyFrom(executorResources);
+  executor.mutable_shutdown_grace_period()->set_nanoseconds(Seconds(10).ns());
+
+  driver.acceptOffers(
+      {offers->front().id()}, {LAUNCH_GROUP(executor, taskGroup)});
+
+  AWAIT_READY(statusRunning);
+  EXPECT_EQ(TASK_RUNNING, statusRunning.get().state());
+
+  AWAIT_READY(statusHealthy);
+  EXPECT_EQ(TASK_RUNNING, statusHealthy.get().state());
+  EXPECT_TRUE(statusHealthy.get().has_healthy());
+  EXPECT_TRUE(statusHealthy.get().healthy());
+
+  Future<hashset<ContainerID>> containerIds = containerizer->containers();
+
+  AWAIT_READY(containerIds);
+
+  driver.stop();
+  driver.join();
+
+  // Cleanup all mesos launched containers.
+  foreach (const ContainerID& containerId, containerIds.get()) {
+    AWAIT_READY(containerizer->wait(containerId));
+  }
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {