You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/03/30 17:35:38 UTC

[1/4] mesos git commit: Cleaned up namespaces in "checker.cpp".

Repository: mesos
Updated Branches:
  refs/heads/master 080e1b7eb -> 1a1fa95d0


Cleaned up namespaces in "checker.cpp".


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1a1fa95d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1a1fa95d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1a1fa95d

Branch: refs/heads/master
Commit: 1a1fa95d0de179d7efab002a99a0e6261ce307f9
Parents: 3f81c6f
Author: Alexander Rukletsov <al...@apache.org>
Authored: Thu Mar 30 17:53:14 2017 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Thu Mar 30 19:34:24 2017 +0200

----------------------------------------------------------------------
 src/checks/checker.cpp | 108 ++++++++++++++++++++------------------------
 1 file changed, 50 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1a1fa95d/src/checks/checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.cpp b/src/checks/checker.cpp
index e48e037..7510bf2 100644
--- a/src/checks/checker.cpp
+++ b/src/checks/checker.cpp
@@ -68,15 +68,14 @@
 #include "linux/ns.hpp"
 #endif
 
+namespace http = process::http;
+
 using process::Failure;
 using process::Future;
 using process::Owned;
 using process::Promise;
 using process::Subprocess;
 
-using process::http::Connection;
-using process::http::Response;
-
 using std::map;
 using std::shared_ptr;
 using std::string;
@@ -139,7 +138,7 @@ public:
       const Option<pid_t>& _taskPid,
       const vector<string>& _namespaces,
       const Option<ContainerID>& _taskContainerId,
-      const Option<process::http::URL>& _agentURL,
+      const Option<http::URL>& _agentURL,
       bool _commandCheckViaAgent);
 
   void pause();
@@ -158,44 +157,40 @@ private:
       const Stopwatch& stopwatch,
       const Option<CheckStatusInfo>& result);
 
-  process::Future<int> commandCheck();
+  Future<int> commandCheck();
 
-  process::Future<int> nestedCommandCheck();
-  void _nestedCommandCheck(std::shared_ptr<process::Promise<int>> promise);
+  Future<int> nestedCommandCheck();
+  void _nestedCommandCheck(shared_ptr<Promise<int>> promise);
   void __nestedCommandCheck(
-      std::shared_ptr<process::Promise<int>> promise,
-      process::http::Connection connection);
+      shared_ptr<Promise<int>> promise,
+      http::Connection connection);
   void ___nestedCommandCheck(
-      std::shared_ptr<process::Promise<int>> promise,
+      shared_ptr<Promise<int>> promise,
       const ContainerID& checkContainerId,
-      const process::http::Response& launchResponse);
+      const http::Response& launchResponse);
 
   void nestedCommandCheckFailure(
-      std::shared_ptr<process::Promise<int>> promise,
-      process::http::Connection connection,
+      shared_ptr<Promise<int>> promise,
+      http::Connection connection,
       ContainerID checkContainerId,
-      std::shared_ptr<bool> checkTimedOut,
-      const std::string& failure);
+      shared_ptr<bool> checkTimedOut,
+      const string& failure);
 
-  process::Future<Option<int>> waitNestedContainer(
-      const ContainerID& containerId);
-  process::Future<Option<int>> _waitNestedContainer(
+  Future<Option<int>> waitNestedContainer(const ContainerID& containerId);
+  Future<Option<int>> _waitNestedContainer(
       const ContainerID& containerId,
-      const process::http::Response& httpResponse);
+      const http::Response& httpResponse);
 
   void processCommandCheckResult(
       const Stopwatch& stopwatch,
-      const process::Future<int>& result);
-
-  process::Future<int> httpCheck();
-  process::Future<int> _httpCheck(
-      const std::tuple<
-          process::Future<Option<int>>,
-          process::Future<std::string>,
-          process::Future<std::string>>& t);
+      const Future<int>& result);
+
+  Future<int> httpCheck();
+  Future<int> _httpCheck(
+      const tuple<Future<Option<int>>, Future<string>, Future<string>>& t);
   void processHttpCheckResult(
       const Stopwatch& stopwatch,
-      const process::Future<int>& result);
+      const Future<int>& result);
 
   const CheckInfo check;
   Duration checkDelay;
@@ -205,9 +200,9 @@ private:
   const lambda::function<void(const CheckStatusInfo&)> updateCallback;
   const TaskID taskId;
   const Option<pid_t> taskPid;
-  const std::vector<std::string> namespaces;
+  const vector<string> namespaces;
   const Option<ContainerID> taskContainerId;
-  const Option<process::http::URL> agentURL;
+  const Option<http::URL> agentURL;
   const bool commandCheckViaAgent;
 
   Option<lambda::function<pid_t(const lambda::function<int()>&)>> clone;
@@ -248,12 +243,12 @@ Try<Owned<Checker>> Checker::create(
 }
 
 
-Try<process::Owned<Checker>> Checker::create(
+Try<Owned<Checker>> Checker::create(
     const CheckInfo& check,
     const lambda::function<void(const CheckStatusInfo&)>& callback,
     const TaskID& taskId,
     const ContainerID& taskContainerId,
-    const process::http::URL& agentURL)
+    const http::URL& agentURL)
 {
   // Validate the `CheckInfo` protobuf.
   Option<Error> error = validation::checkInfo(check);
@@ -308,7 +303,7 @@ CheckerProcess::CheckerProcess(
     const Option<pid_t>& _taskPid,
     const vector<string>& _namespaces,
     const Option<ContainerID>& _taskContainerId,
-    const Option<process::http::URL>& _agentURL,
+    const Option<http::URL>& _agentURL,
     bool _commandCheckViaAgent)
   : ProcessBase(process::ID::generate("checker")),
     check(_check),
@@ -587,14 +582,14 @@ Future<int> CheckerProcess::nestedCommandCheck()
     removeContainer->mutable_container_id()->CopyFrom(
         previousCheckContainerId.get());
 
-    process::http::Request request;
+    http::Request request;
     request.method = "POST";
     request.url = agentURL.get();
     request.body = serialize(ContentType::PROTOBUF, evolve(call));
     request.headers = {{"Accept", stringify(ContentType::PROTOBUF)},
                        {"Content-Type", stringify(ContentType::PROTOBUF)}};
 
-    process::http::request(request, false)
+    http::request(request, false)
       .onFailed(defer(self(),
                       [this, promise](const string& failure) {
         LOG(WARNING) << "Connection to remove the nested container '"
@@ -606,8 +601,8 @@ Future<int> CheckerProcess::nestedCommandCheck()
         // as a transient failure and discard the promise.
         promise->discard();
       }))
-      .onReady(defer(self(), [this, promise](const Response& response) {
-        if (response.code != process::http::Status::OK) {
+      .onReady(defer(self(), [this, promise](const http::Response& response) {
+        if (response.code != http::Status::OK) {
           // The agent was unable to remove the check container, we
           // treat this as a transient failure and discard the promise.
           LOG(WARNING) << "Received '" << response.status << "' ("
@@ -630,14 +625,13 @@ Future<int> CheckerProcess::nestedCommandCheck()
 }
 
 
-void CheckerProcess::_nestedCommandCheck(
-    shared_ptr<process::Promise<int>> promise)
+void CheckerProcess::_nestedCommandCheck(shared_ptr<Promise<int>> promise)
 {
   // TODO(alexr): Use a lambda named capture for
   // this cached value once it is available.
   const TaskID _taskId = taskId;
 
-  process::http::connect(agentURL.get())
+  http::connect(agentURL.get())
     .onFailed(defer(self(), [_taskId, promise](const string& failure) {
       LOG(WARNING) << "Unable to establish connection with the agent to launch"
                    << " COMMAND check for task '" << _taskId << "'"
@@ -651,8 +645,8 @@ void CheckerProcess::_nestedCommandCheck(
 
 
 void CheckerProcess::__nestedCommandCheck(
-    shared_ptr<process::Promise<int>> promise,
-    Connection connection)
+    shared_ptr<Promise<int>> promise,
+    http::Connection connection)
 {
   ContainerID checkContainerId;
   checkContainerId.set_value("check-" + UUID::random().toString());
@@ -671,7 +665,7 @@ void CheckerProcess::__nestedCommandCheck(
   launch->mutable_container_id()->CopyFrom(checkContainerId);
   launch->mutable_command()->CopyFrom(command);
 
-  process::http::Request request;
+  http::Request request;
   request.method = "POST";
   request.url = agentURL.get();
   request.body = serialize(ContentType::PROTOBUF, evolve(call));
@@ -698,7 +692,8 @@ void CheckerProcess::__nestedCommandCheck(
   // check command has finished or the connection has been closed.
   connection.send(request, false)
     .after(checkTimeout,
-           defer(self(), [timeout, checkTimedOut](Future<Response> future) {
+           defer(self(),
+                 [timeout, checkTimedOut](Future<http::Response> future) {
       future.discard();
 
       *checkTimedOut = true;
@@ -721,11 +716,11 @@ void CheckerProcess::__nestedCommandCheck(
 
 
 void CheckerProcess::___nestedCommandCheck(
-    shared_ptr<process::Promise<int>> promise,
+    shared_ptr<Promise<int>> promise,
     const ContainerID& checkContainerId,
-    const Response& launchResponse)
+    const http::Response& launchResponse)
 {
-  if (launchResponse.code != process::http::Status::OK) {
+  if (launchResponse.code != http::Status::OK) {
     // The agent was unable to launch the check container,
     // we treat this as a transient failure.
     LOG(WARNING) << "Received '" << launchResponse.status << "' ("
@@ -760,7 +755,7 @@ void CheckerProcess::___nestedCommandCheck(
 
 void CheckerProcess::nestedCommandCheckFailure(
     shared_ptr<Promise<int>> promise,
-    Connection connection,
+    http::Connection connection,
     ContainerID checkContainerId,
     shared_ptr<bool> checkTimedOut,
     const string& failure)
@@ -814,15 +809,15 @@ Future<Option<int>> CheckerProcess::waitNestedContainer(
 
   containerWait->mutable_container_id()->CopyFrom(containerId);
 
-  process::http::Request request;
+  http::Request request;
   request.method = "POST";
   request.url = agentURL.get();
   request.body = serialize(ContentType::PROTOBUF, evolve(call));
   request.headers = {{"Accept", stringify(ContentType::PROTOBUF)},
                      {"Content-Type", stringify(ContentType::PROTOBUF)}};
 
-  return process::http::request(request, false)
-    .repair([containerId](const Future<Response>& future) {
+  return http::request(request, false)
+    .repair([containerId](const Future<http::Response>& future) {
       return Failure(
           "Connection to wait for check container '" +
           stringify(containerId) + "' failed: " + future.failure());
@@ -834,9 +829,9 @@ Future<Option<int>> CheckerProcess::waitNestedContainer(
 
 Future<Option<int>> CheckerProcess::_waitNestedContainer(
     const ContainerID& containerId,
-    const Response& httpResponse)
+    const http::Response& httpResponse)
 {
-  if (httpResponse.code != process::http::Status::OK) {
+  if (httpResponse.code != http::Status::OK) {
     return Failure(
         "Received '" + httpResponse.status + "' (" + httpResponse.body +
         ") while waiting on check container '" + stringify(containerId) + "'");
@@ -979,10 +974,7 @@ Future<int> CheckerProcess::httpCheck()
 
 
 Future<int> CheckerProcess::_httpCheck(
-    const tuple<
-        Future<Option<int>>,
-        Future<string>,
-        Future<string>>& t)
+    const tuple<Future<Option<int>>, Future<string>, Future<string>>& t)
 {
   const Future<Option<int>>& status = std::get<0>(t);
   if (!status.isReady()) {
@@ -1032,7 +1024,7 @@ Future<int> CheckerProcess::_httpCheck(
 
 void CheckerProcess::processHttpCheckResult(
     const Stopwatch& stopwatch,
-    const process::Future<int>& result)
+    const Future<int>& result)
 {
   CheckStatusInfo checkStatusInfo;
   checkStatusInfo.set_type(check.type());


[3/4] mesos git commit: Added support for COMMAND checks to the default executor.

Posted by al...@apache.org.
Added support for COMMAND checks to the default executor.

Review: https://reviews.apache.org/r/58030/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3f81c6f6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3f81c6f6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3f81c6f6

Branch: refs/heads/master
Commit: 3f81c6f6052768e326e84e2eab93c20572b490ad
Parents: 3a689ab
Author: Gast�n Kleiman <ga...@mesosphere.io>
Authored: Thu Mar 30 17:14:14 2017 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Thu Mar 30 19:34:24 2017 +0200

----------------------------------------------------------------------
 src/checks/checker.cpp            | 454 ++++++++++++++++++--
 src/checks/checker.hpp            |  32 +-
 src/launcher/default_executor.cpp |   8 +-
 src/tests/check_tests.cpp         | 733 ++++++++++++++++++++++++++++++++-
 4 files changed, 1181 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3f81c6f6/src/checks/checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.cpp b/src/checks/checker.cpp
index d1e9083..e48e037 100644
--- a/src/checks/checker.cpp
+++ b/src/checks/checker.cpp
@@ -19,6 +19,7 @@
 #include <cstdint>
 #include <iterator>
 #include <map>
+#include <memory>
 #include <string>
 #include <tuple>
 #include <vector>
@@ -28,6 +29,8 @@
 #include <mesos/mesos.hpp>
 #include <mesos/type_utils.hpp>
 
+#include <mesos/agent/agent.hpp>
+
 #include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
@@ -49,14 +52,18 @@
 #include <stout/strings.hpp>
 #include <stout/try.hpp>
 #include <stout/unreachable.hpp>
+#include <stout/uuid.hpp>
 
 #include <stout/os/environment.hpp>
 #include <stout/os/killtree.hpp>
 
+#include "common/http.hpp"
 #include "common/protobuf_utils.hpp"
 #include "common/status_utils.hpp"
 #include "common/validation.hpp"
 
+#include "internal/evolve.hpp"
+
 #ifdef __linux__
 #include "linux/ns.hpp"
 #endif
@@ -64,9 +71,14 @@
 using process::Failure;
 using process::Future;
 using process::Owned;
+using process::Promise;
 using process::Subprocess;
 
+using process::http::Connection;
+using process::http::Response;
+
 using std::map;
+using std::shared_ptr;
 using std::string;
 using std::tuple;
 using std::vector;
@@ -125,7 +137,10 @@ public:
       const lambda::function<void(const CheckStatusInfo&)>& _callback,
       const TaskID& _taskId,
       const Option<pid_t>& _taskPid,
-      const std::vector<std::string>& _namespaces);
+      const vector<string>& _namespaces,
+      const Option<ContainerID>& _taskContainerId,
+      const Option<process::http::URL>& _agentURL,
+      bool _commandCheckViaAgent);
 
   void pause();
   void resume();
@@ -141,9 +156,33 @@ private:
   void scheduleNext(const Duration& duration);
   void processCheckResult(
       const Stopwatch& stopwatch,
-      const CheckStatusInfo& result);
+      const Option<CheckStatusInfo>& result);
 
   process::Future<int> commandCheck();
+
+  process::Future<int> nestedCommandCheck();
+  void _nestedCommandCheck(std::shared_ptr<process::Promise<int>> promise);
+  void __nestedCommandCheck(
+      std::shared_ptr<process::Promise<int>> promise,
+      process::http::Connection connection);
+  void ___nestedCommandCheck(
+      std::shared_ptr<process::Promise<int>> promise,
+      const ContainerID& checkContainerId,
+      const process::http::Response& launchResponse);
+
+  void nestedCommandCheckFailure(
+      std::shared_ptr<process::Promise<int>> promise,
+      process::http::Connection connection,
+      ContainerID checkContainerId,
+      std::shared_ptr<bool> checkTimedOut,
+      const std::string& failure);
+
+  process::Future<Option<int>> waitNestedContainer(
+      const ContainerID& containerId);
+  process::Future<Option<int>> _waitNestedContainer(
+      const ContainerID& containerId,
+      const process::http::Response& httpResponse);
+
   void processCommandCheckResult(
       const Stopwatch& stopwatch,
       const process::Future<int>& result);
@@ -167,10 +206,18 @@ private:
   const TaskID taskId;
   const Option<pid_t> taskPid;
   const std::vector<std::string> namespaces;
+  const Option<ContainerID> taskContainerId;
+  const Option<process::http::URL> agentURL;
+  const bool commandCheckViaAgent;
+
   Option<lambda::function<pid_t(const lambda::function<int()>&)>> clone;
 
   CheckStatusInfo previousCheckStatus;
   bool paused;
+
+  // Contains the ID of the most recently terminated nested container
+  // that was used to perform a COMMAND check.
+  Option<ContainerID> previousCheckContainerId;
 };
 
 
@@ -192,7 +239,37 @@ Try<Owned<Checker>> Checker::create(
       callback,
       taskId,
       taskPid,
-      namespaces));
+      namespaces,
+      None(),
+      None(),
+      false));
+
+  return Owned<Checker>(new Checker(process));
+}
+
+
+Try<process::Owned<Checker>> Checker::create(
+    const CheckInfo& check,
+    const lambda::function<void(const CheckStatusInfo&)>& callback,
+    const TaskID& taskId,
+    const ContainerID& taskContainerId,
+    const process::http::URL& agentURL)
+{
+  // Validate the `CheckInfo` protobuf.
+  Option<Error> error = validation::checkInfo(check);
+  if (error.isSome()) {
+    return error.get();
+  }
+
+  Owned<CheckerProcess> process(new CheckerProcess(
+      check,
+      callback,
+      taskId,
+      None(),
+      {},
+      taskContainerId,
+      agentURL,
+      true));
 
   return Owned<Checker>(new Checker(process));
 }
@@ -229,13 +306,19 @@ CheckerProcess::CheckerProcess(
     const lambda::function<void(const CheckStatusInfo&)>& _callback,
     const TaskID& _taskId,
     const Option<pid_t>& _taskPid,
-    const vector<string>& _namespaces)
+    const vector<string>& _namespaces,
+    const Option<ContainerID>& _taskContainerId,
+    const Option<process::http::URL>& _agentURL,
+    bool _commandCheckViaAgent)
   : ProcessBase(process::ID::generate("checker")),
     check(_check),
     updateCallback(_callback),
     taskId(_taskId),
     taskPid(_taskPid),
     namespaces(_namespaces),
+    taskContainerId(_taskContainerId),
+    agentURL(_agentURL),
+    commandCheckViaAgent(_commandCheckViaAgent),
     paused(false)
 {
   Try<Duration> create = Duration::create(check.delay_seconds());
@@ -306,7 +389,9 @@ void CheckerProcess::performCheck()
 
   switch (check.type()) {
     case CheckInfo::COMMAND: {
-      commandCheck().onAny(defer(
+      Future<int> future = commandCheckViaAgent ? nestedCommandCheck()
+                                                : commandCheck();
+      future.onAny(defer(
           self(),
           &Self::processCommandCheckResult, stopwatch, lambda::_1));
       break;
@@ -361,7 +446,7 @@ void CheckerProcess::resume()
 
 void CheckerProcess::processCheckResult(
     const Stopwatch& stopwatch,
-    const CheckStatusInfo& result)
+    const Option<CheckStatusInfo>& result)
 {
   // `Checker` might have been paused while performing the check.
   if (paused) {
@@ -370,16 +455,20 @@ void CheckerProcess::processCheckResult(
     return;
   }
 
-  VLOG(1) << "Performed " << check.type() << " check"
-          << " for task '" << taskId << "' in " << stopwatch.elapsed();
-
-  // Trigger the callback if check info changes.
-  if (result != previousCheckStatus) {
-    // We assume this is a local send, i.e., the checker library is not used
-    // in a binary external to the executor and hence can not exit before
-    // the data is sent to the executor.
-    updateCallback(result);
-    previousCheckStatus = result;
+  // `result` should be some if it was possible to perform the check,
+  // and empty if there was a transient error.
+  if (result.isSome()) {
+    VLOG(1) << "Performed " << check.type() << " check"
+            << " for task '" << taskId << "' in " << stopwatch.elapsed();
+
+    // Trigger the callback if check info changes.
+    if (result.get() != previousCheckStatus) {
+      // We assume this is a local send, i.e., the checker library is not used
+      // in a binary external to the executor and hence can not exit before
+      // the data is sent to the executor.
+      updateCallback(result.get());
+      previousCheckStatus = result.get();
+    }
   }
 
   scheduleNext(checkInterval);
@@ -470,37 +559,346 @@ Future<int> CheckerProcess::commandCheck()
 }
 
 
+Future<int> CheckerProcess::nestedCommandCheck()
+{
+  CHECK_EQ(CheckInfo::COMMAND, check.type());
+  CHECK(check.has_command());
+  CHECK_SOME(taskContainerId);
+  CHECK_SOME(agentURL);
+
+  VLOG(1) << "Launching COMMAND check for task '" << taskId << "'";
+
+  // We don't want recoverable errors, e.g., the agent responding with
+  // HTTP status code 503, to trigger a check failure.
+  //
+  // The future returned by this method represents the result of a
+  // check. It will be set to the exit status of the check command if it
+  // succeeded, to a `Failure` if there was a non-transient error, and
+  // discarded if there was a transient error.
+  auto promise = std::make_shared<Promise<int>>();
+
+  if (previousCheckContainerId.isSome()) {
+    agent::Call call;
+    call.set_type(agent::Call::REMOVE_NESTED_CONTAINER);
+
+    agent::Call::RemoveNestedContainer* removeContainer =
+      call.mutable_remove_nested_container();
+
+    removeContainer->mutable_container_id()->CopyFrom(
+        previousCheckContainerId.get());
+
+    process::http::Request request;
+    request.method = "POST";
+    request.url = agentURL.get();
+    request.body = serialize(ContentType::PROTOBUF, evolve(call));
+    request.headers = {{"Accept", stringify(ContentType::PROTOBUF)},
+                       {"Content-Type", stringify(ContentType::PROTOBUF)}};
+
+    process::http::request(request, false)
+      .onFailed(defer(self(),
+                      [this, promise](const string& failure) {
+        LOG(WARNING) << "Connection to remove the nested container '"
+                     << previousCheckContainerId.get()
+                     << "' used for the COMMAND check for task '"
+                     << taskId << "' failed: " << failure;
+
+        // Something went wrong while sending the request, we treat this
+        // as a transient failure and discard the promise.
+        promise->discard();
+      }))
+      .onReady(defer(self(), [this, promise](const Response& response) {
+        if (response.code != process::http::Status::OK) {
+          // The agent was unable to remove the check container, we
+          // treat this as a transient failure and discard the promise.
+          LOG(WARNING) << "Received '" << response.status << "' ("
+                       << response.body << ") while removing the nested"
+                       << " container '" << previousCheckContainerId.get()
+                       << "' used for the COMMAND check for task '"
+                       << taskId << "'";
+
+          promise->discard();
+        }
+
+        previousCheckContainerId = None();
+        _nestedCommandCheck(promise);
+      }));
+  } else {
+    _nestedCommandCheck(promise);
+  }
+
+  return promise->future();
+}
+
+
+void CheckerProcess::_nestedCommandCheck(
+    shared_ptr<process::Promise<int>> promise)
+{
+  // TODO(alexr): Use a lambda named capture for
+  // this cached value once it is available.
+  const TaskID _taskId = taskId;
+
+  process::http::connect(agentURL.get())
+    .onFailed(defer(self(), [_taskId, promise](const string& failure) {
+      LOG(WARNING) << "Unable to establish connection with the agent to launch"
+                   << " COMMAND check for task '" << _taskId << "'"
+                   << ": " << failure;
+
+      // We treat this as a transient failure.
+      promise->discard();
+    }))
+    .onReady(defer(self(), &Self::__nestedCommandCheck, promise, lambda::_1));
+}
+
+
+void CheckerProcess::__nestedCommandCheck(
+    shared_ptr<process::Promise<int>> promise,
+    Connection connection)
+{
+  ContainerID checkContainerId;
+  checkContainerId.set_value("check-" + UUID::random().toString());
+  checkContainerId.mutable_parent()->CopyFrom(taskContainerId.get());
+
+  previousCheckContainerId = checkContainerId;
+
+  CommandInfo command(check.command().command());
+
+  agent::Call call;
+  call.set_type(agent::Call::LAUNCH_NESTED_CONTAINER_SESSION);
+
+  agent::Call::LaunchNestedContainerSession* launch =
+    call.mutable_launch_nested_container_session();
+
+  launch->mutable_container_id()->CopyFrom(checkContainerId);
+  launch->mutable_command()->CopyFrom(command);
+
+  process::http::Request request;
+  request.method = "POST";
+  request.url = agentURL.get();
+  request.body = serialize(ContentType::PROTOBUF, evolve(call));
+  request.headers = {{"Accept", stringify(ContentType::RECORDIO)},
+                     {"Message-Accept", stringify(ContentType::PROTOBUF)},
+                     {"Content-Type", stringify(ContentType::PROTOBUF)}};
+
+  // TODO(alexr): Use a lambda named capture for
+  // this cached value once it is available.
+  const Duration timeout = checkTimeout;
+
+  auto checkTimedOut = std::make_shared<bool>(false);
+
+  // `LAUNCH_NESTED_CONTAINER_SESSION` returns a streamed response with
+  // the output of the container. The agent will close the stream once
+  // the container has exited, or kill the container if the client
+  // closes the connection.
+  //
+  // We're calling `Connection::send` with `streamed = false`, so that
+  // it returns an HTTP response of type 'BODY' once the entire response
+  // is received.
+  //
+  // This means that this future will not be completed until after the
+  // check command has finished or the connection has been closed.
+  connection.send(request, false)
+    .after(checkTimeout,
+           defer(self(), [timeout, checkTimedOut](Future<Response> future) {
+      future.discard();
+
+      *checkTimedOut = true;
+
+      return Failure("Command timed out after " + stringify(timeout));
+    }))
+    .onFailed(defer(self(),
+                    &Self::nestedCommandCheckFailure,
+                    promise,
+                    connection,
+                    checkContainerId,
+                    checkTimedOut,
+                    lambda::_1))
+    .onReady(defer(self(),
+                   &Self::___nestedCommandCheck,
+                   promise,
+                   checkContainerId,
+                   lambda::_1));
+}
+
+
+void CheckerProcess::___nestedCommandCheck(
+    shared_ptr<process::Promise<int>> promise,
+    const ContainerID& checkContainerId,
+    const Response& launchResponse)
+{
+  if (launchResponse.code != process::http::Status::OK) {
+    // The agent was unable to launch the check container,
+    // we treat this as a transient failure.
+    LOG(WARNING) << "Received '" << launchResponse.status << "' ("
+                 << launchResponse.body << ") while launching COMMAND check"
+                 << " for task '" << taskId << "'";
+
+    promise->discard();
+    return;
+  }
+
+  waitNestedContainer(checkContainerId)
+    .onFailed([promise](const string& failure) {
+      promise->fail(
+          "Unable to get the exit code: " + failure);
+    })
+    .onReady([promise](const Option<int>& status) -> void {
+      if (status.isNone()) {
+        promise->fail("Unable to get the exit code");
+      // TODO(gkleiman): Make sure that the following block works on Windows.
+      } else if (WIFSIGNALED(status.get()) &&
+                 WTERMSIG(status.get()) == SIGKILL) {
+        // The check container was signaled, probably because the task
+        // finished while the check was still in-flight, so we discard
+        // the result.
+        promise->discard();
+      } else {
+        promise->set(status.get());
+      }
+    });
+}
+
+
+void CheckerProcess::nestedCommandCheckFailure(
+    shared_ptr<Promise<int>> promise,
+    Connection connection,
+    ContainerID checkContainerId,
+    shared_ptr<bool> checkTimedOut,
+    const string& failure)
+{
+  if (*checkTimedOut) {
+    // The check timed out, closing the connection will make the agent
+    // kill the container.
+    connection.disconnect();
+
+    // If the check delay interval is zero, we'll try to perform another
+    // check right after we finish processing the current timeout.
+    //
+    // We'll try to remove the container created for the check at the
+    // beginning of the next check. In order to prevent a failure, the
+    // promise should only be completed once we're sure that the
+    // container has terminated.
+    waitNestedContainer(checkContainerId)
+      .onAny([failure, promise](const Future<Option<int>>&) {
+        // We assume that once `WaitNestedContainer` returns,
+        // irrespective of whether the response contains a failure, the
+        // container will be in a terminal state, and that it will be
+        // possible to remove it.
+        //
+        // This means that we don't need to retry the
+        // `WaitNestedContainer` call.
+        promise->fail(failure);
+      });
+  } else {
+    // The agent was not able to complete the request, discarding the
+    // promise signals the checker that it should retry the check.
+    //
+    // This will allow us to recover from a blip. The executor will
+    // pause the checker when it detects that the agent is not
+    // available.
+    LOG(WARNING) << "Connection to the agent to launch COMMAND check"
+                 << " for task '" << taskId << "' failed: " << failure;
+
+    promise->discard();
+  }
+}
+
+
+Future<Option<int>> CheckerProcess::waitNestedContainer(
+    const ContainerID& containerId)
+{
+  agent::Call call;
+  call.set_type(agent::Call::WAIT_NESTED_CONTAINER);
+
+  agent::Call::WaitNestedContainer* containerWait =
+    call.mutable_wait_nested_container();
+
+  containerWait->mutable_container_id()->CopyFrom(containerId);
+
+  process::http::Request request;
+  request.method = "POST";
+  request.url = agentURL.get();
+  request.body = serialize(ContentType::PROTOBUF, evolve(call));
+  request.headers = {{"Accept", stringify(ContentType::PROTOBUF)},
+                     {"Content-Type", stringify(ContentType::PROTOBUF)}};
+
+  return process::http::request(request, false)
+    .repair([containerId](const Future<Response>& future) {
+      return Failure(
+          "Connection to wait for check container '" +
+          stringify(containerId) + "' failed: " + future.failure());
+    })
+    .then(defer(self(),
+                &Self::_waitNestedContainer, containerId, lambda::_1));
+}
+
+
+Future<Option<int>> CheckerProcess::_waitNestedContainer(
+    const ContainerID& containerId,
+    const Response& httpResponse)
+{
+  if (httpResponse.code != process::http::Status::OK) {
+    return Failure(
+        "Received '" + httpResponse.status + "' (" + httpResponse.body +
+        ") while waiting on check container '" + stringify(containerId) + "'");
+  }
+
+  Try<agent::Response> response =
+    deserialize<agent::Response>(ContentType::PROTOBUF, httpResponse.body);
+  CHECK_SOME(response);
+
+  CHECK(response->has_wait_nested_container());
+
+  return (
+      response->wait_nested_container().has_exit_status()
+        ? Option<int>(response->wait_nested_container().exit_status())
+        : Option<int>::none());
+}
+
+
 void CheckerProcess::processCommandCheckResult(
     const Stopwatch& stopwatch,
-    const Future<int>& result)
+    const Future<int>& future)
 {
-  CheckStatusInfo checkStatusInfo;
-  checkStatusInfo.set_type(check.type());
+  Option<CheckStatusInfo> result;
 
-  // On Posix, `result` corresponds to termination information in the
+  // On Posix, `future` corresponds to termination information in the
   // `stat_loc` area. On Windows, `status` is obtained via calling the
   // `GetExitCodeProcess()` function.
   //
   // TODO(alexr): Ensure `WEXITSTATUS` family macros are no-op on Windows,
   // see MESOS-7242.
-  if (result.isReady() && WIFEXITED(result.get())) {
-    const int exitCode = WEXITSTATUS(result.get());
-    VLOG(1) << check.type() << " check for task '"
-            << taskId << "' returned: " << exitCode;
+  if (future.isReady() && WIFEXITED(future.get())) {
+    const int exitCode = WEXITSTATUS(future.get());
+    VLOG(1) << check.type() << " check for task '" << taskId << "'"
+            << " returned: " << exitCode;
 
+    CheckStatusInfo checkStatusInfo;
+    checkStatusInfo.set_type(check.type());
     checkStatusInfo.mutable_command()->set_exit_code(
         static_cast<int32_t>(exitCode));
+
+    result = checkStatusInfo;
+  } else if (future.isDiscarded()) {
+    // Check's status is currently not available due to a transient error,
+    // e.g., due to the agent failover, no `CheckStatusInfo` message should
+    // be sent to the callback.
+    LOG(INFO) << check.type() << " check for task '" << taskId << "' discarded";
+
+    result = None();
   } else {
     // Check's status is currently not available, which may indicate a change
     // that should be reported as an empty `CheckStatusInfo.Command` message.
-    LOG(WARNING) << check.type() << " check for task '" << taskId
-                 << "' failed: "
-                 << (result.isFailed() ? result.failure() : "discarded");
+    LOG(WARNING) << check.type() << " check for task '" << taskId << "'"
+                 << " failed: " << future.failure();
 
+    CheckStatusInfo checkStatusInfo;
+    checkStatusInfo.set_type(check.type());
     checkStatusInfo.mutable_command();
+
+    result = checkStatusInfo;
   }
 
-  processCheckResult(stopwatch, checkStatusInfo);
+  processCheckResult(stopwatch, result);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3f81c6f6/src/checks/checker.hpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.hpp b/src/checks/checker.hpp
index 1521b9c..fb939d8 100644
--- a/src/checks/checker.hpp
+++ b/src/checks/checker.hpp
@@ -22,6 +22,7 @@
 
 #include <mesos/mesos.hpp>
 
+#include <process/http.hpp>
 #include <process/owned.hpp>
 
 #include <stout/error.hpp>
@@ -43,6 +44,9 @@ public:
    * Attempts to create a `Checker` object. In case of success, checking
    * starts immediately after initialization.
    *
+   * If the check is a COMMAND check, the checker will fork a process, enter
+   * the task's namespaces, and execute the commmand.
+   *
    * @param check The protobuf message definition of a check.
    * @param callback A callback `Checker` uses to send check status updates
    *     to its owner (usually an executor).
@@ -56,12 +60,38 @@ public:
    * `process::Stream<CheckStatusInfo>` rather than invoking a callback.
    */
   static Try<process::Owned<Checker>> create(
-      const CheckInfo& checkInfo,
+      const CheckInfo& check,
       const lambda::function<void(const CheckStatusInfo&)>& callback,
       const TaskID& taskId,
       const Option<pid_t>& taskPid,
       const std::vector<std::string>& namespaces);
 
+  /**
+   * Attempts to create a `Checker` object. In case of success, checking
+   * starts immediately after initialization.
+   *
+   * If the check is a COMMAND check, the checker will delegate the execution
+   * of the check to the Mesos agent via the `LaunchNestedContainerSession`
+   * API call.
+   *
+   * @param check The protobuf message definition of a check.
+   * @param callback A callback `Checker` uses to send check status updates
+   *     to its owner (usually an executor).
+   * @param taskId The TaskID of the target task.
+   * @param taskContainerId The ContainerID of the target task.
+   * @param agentURL The URL of the agent.
+   * @return A `Checker` object or an error if `create` fails.
+   *
+   * @todo A better approach would be to return a stream of updates, e.g.,
+   * `process::Stream<CheckStatusInfo>` rather than invoking a callback.
+   */
+  static Try<process::Owned<Checker>> create(
+      const CheckInfo& check,
+      const lambda::function<void(const CheckStatusInfo&)>& callback,
+      const TaskID& taskId,
+      const ContainerID& taskContainerId,
+      const process::http::URL& agentURL);
+
   ~Checker();
 
   // Not copyable, not assignable.

http://git-wip-us.apache.org/repos/asf/mesos/blob/3f81c6f6/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index 79785fc..9cc40c6 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -503,17 +503,13 @@ protected:
         false});
 
       if (task.has_check()) {
-        // TODO(alexr): Add support for command checks.
-        CHECK_NE(CheckInfo::COMMAND, task.check().type())
-          << "Command checks are not supported yet";
-
         Try<Owned<checks::Checker>> checker =
           checks::Checker::create(
               task.check(),
               defer(self(), &Self::taskCheckUpdated, taskId, lambda::_1),
               taskId,
-              None(),
-              vector<string>());
+              containerId,
+              agent);
 
         if (checker.isError()) {
           // TODO(anand): Should we send a TASK_FAILED instead?

http://git-wip-us.apache.org/repos/asf/mesos/blob/3f81c6f6/src/tests/check_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/check_tests.cpp b/src/tests/check_tests.cpp
index 78ac498..d7fcbf9 100644
--- a/src/tests/check_tests.cpp
+++ b/src/tests/check_tests.cpp
@@ -33,11 +33,16 @@
 
 #include "checks/checker.hpp"
 
+#include "slave/containerizer/fetcher.hpp"
+
 #include "tests/flags.hpp"
 #include "tests/health_check_test_helper.hpp"
 #include "tests/mesos.hpp"
 #include "tests/utils.hpp"
 
+using mesos::internal::slave::Fetcher;
+using mesos::internal::slave::MesosContainerizer;
+
 using mesos::master::detector::MasterDetector;
 
 using mesos::v1::scheduler::Call;
@@ -211,6 +216,17 @@ public:
 
     mesos->send(call);
   }
+
+  virtual void teardown(
+      Mesos* mesos,
+      const v1::FrameworkID& frameworkId)
+  {
+    Call call;
+    call.set_type(Call::TEARDOWN);
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+
+    mesos->send(call);
+  }
 };
 
 
@@ -661,10 +677,14 @@ TEST_F(CommandExecutorCheckTest, CommandCheckAndHealthCheckNoShadowing)
   v1::TaskInfo taskInfo =
       v1::createTask(agentId, resources, SLEEP_COMMAND(10000));
 
+  // Set both check and health check interval to an increased value to
+  // prevent a second update coming before reconciliation response.
+  int interval = 10;
+
   v1::CheckInfo* checkInfo = taskInfo.mutable_check();
   checkInfo->set_type(v1::CheckInfo::COMMAND);
   checkInfo->set_delay_seconds(0);
-  checkInfo->set_interval_seconds(0);
+  checkInfo->set_interval_seconds(interval);
   checkInfo->mutable_command()->mutable_command()->set_value("exit 1");
 
   // Delay health check for 1s to ensure health update comes after check update.
@@ -676,7 +696,7 @@ TEST_F(CommandExecutorCheckTest, CommandCheckAndHealthCheckNoShadowing)
   v1::HealthCheck* healthCheckInfo = taskInfo.mutable_health_check();
   healthCheckInfo->set_type(v1::HealthCheck::COMMAND);
   healthCheckInfo->set_delay_seconds(1);
-  healthCheckInfo->set_interval_seconds(0);
+  healthCheckInfo->set_interval_seconds(interval);
   healthCheckInfo->mutable_command()->set_value("exit 0");
 
   launchTask(&mesos, offer, taskInfo);
@@ -860,16 +880,707 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(CommandExecutorCheckTest, HTTPCheckDelivered)
 // These are check tests with the default executor.
 class DefaultExecutorCheckTest : public CheckTest {};
 
-// TODO(alexr): Implement following tests once the default executor supports
-// command checks.
+
+// Verifies that a command check is supported by the default executor,
+// its status is delivered in a task status update, and the last known
+// status can be obtained during explicit and implicit reconciliation.
+// Additionally ensures that the specified environment of the command
+// check is honored.
 //
-// 1. COMMAND check with env var works, is delivered, and is reconciled
-//    properly.
-// 2. COMMAND check's status change is delivered. TODO(alexr): When check
-//    mocking is available, ensure only status changes are delivered.
-// 3. COMMAND check times out.
-// 4. COMMAND check and health check do not shadow each other; upon
-//    reconciliation both statuses are available.
+// TODO(gkleiman): Check if this test works on Windows.
+TEST_F_TEMP_DISABLED_ON_WINDOWS(
+    DefaultExecutorCheckTest,
+    CommandCheckDeliveredAndReconciled)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Disable AuthN on the agent.
+  slave::Flags flags = CreateSlaveFlags();
+  flags.authenticate_http_readwrite = false;
+
+  Fetcher fetcher;
+
+  // We have to explicitly create a `Containerizer` in non-local mode,
+  // because `LaunchNestedContainerSession` (used by command checks)
+  // tries to start a IO switchboard, which doesn't work in local mode yet.
+  Try<MesosContainerizer*> _containerizer =
+    MesosContainerizer::create(flags, false, &fetcher);
+
+  ASSERT_SOME(_containerizer);
+
+  Owned<slave::Containerizer> containerizer(_containerizer.get());
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> agent =
+    StartSlave(detector.get(), containerizer.get(), flags);
+  ASSERT_SOME(agent);
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+
+  const v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::ExecutorInfo executorInfo;
+  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
+  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
+  executorInfo.mutable_resources()->CopyFrom(resources);
+  executorInfo.mutable_shutdown_grace_period()->set_nanoseconds(
+      Seconds(10).ns());
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  subscribe(&mesos, frameworkInfo);
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  // Update `executorInfo` with the subscribed `frameworkId`.
+  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID agentId = offer.agent_id();
+
+  Future<Event::Update> updateTaskRunning;
+  Future<Event::Update> updateCheckResult;
+  Future<Event::Update> updateExplicitReconciliation;
+  Future<Event::Update> updateImplicitReconciliation;
+
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&updateTaskRunning))
+    .WillOnce(FutureArg<1>(&updateCheckResult))
+    .WillOnce(FutureArg<1>(&updateExplicitReconciliation))
+    .WillOnce(FutureArg<1>(&updateImplicitReconciliation))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(10000));
+
+  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
+  checkInfo->set_type(v1::CheckInfo::COMMAND);
+  checkInfo->set_delay_seconds(0);
+  checkInfo->set_interval_seconds(0);
+
+  v1::CommandInfo* checkCommand =
+    checkInfo->mutable_command()->mutable_command();
+  checkCommand->set_value("exit $STATUS");
+
+  v1::Environment::Variable* variable =
+    checkCommand->mutable_environment()->add_variables();
+  variable->set_name("STATUS");
+  variable->set_value("1");
+
+  v1::TaskGroupInfo taskGroup;
+  taskGroup.add_tasks()->CopyFrom(taskInfo);
+
+  launchTaskGroup(&mesos, offer, executorInfo, taskGroup);
+
+  AWAIT_READY(updateTaskRunning);
+  const v1::TaskStatus& taskRunning = updateTaskRunning->status();
+
+  ASSERT_EQ(TASK_RUNNING, taskRunning.state());
+  EXPECT_EQ(taskInfo.task_id(), taskRunning.task_id());
+  EXPECT_TRUE(taskRunning.has_check_status());
+  EXPECT_TRUE(taskRunning.check_status().has_command());
+  EXPECT_FALSE(taskRunning.check_status().command().has_exit_code());
+
+  acknowledge(&mesos, frameworkId, taskRunning);
+
+  AWAIT_READY(updateCheckResult);
+  const v1::TaskStatus& checkResult = updateCheckResult->status();
+
+  ASSERT_EQ(TASK_RUNNING, checkResult.state());
+  ASSERT_EQ(
+      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+      checkResult.reason());
+  EXPECT_EQ(taskInfo.task_id(), checkResult.task_id());
+  EXPECT_TRUE(checkResult.has_check_status());
+  EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
+  EXPECT_EQ(1, checkResult.check_status().command().exit_code());
+
+  acknowledge(&mesos, frameworkId, checkResult);
+
+  // Trigger explicit reconciliation.
+  reconcile(
+      &mesos,
+      frameworkId,
+      {std::make_pair(checkResult.task_id(), checkResult.agent_id())});
+
+  AWAIT_READY(updateExplicitReconciliation);
+  const v1::TaskStatus& explicitReconciliation =
+    updateExplicitReconciliation->status();
+
+  ASSERT_EQ(TASK_RUNNING, explicitReconciliation.state());
+  ASSERT_EQ(
+      v1::TaskStatus::REASON_RECONCILIATION,
+      explicitReconciliation.reason());
+  EXPECT_EQ(taskInfo.task_id(), explicitReconciliation.task_id());
+  EXPECT_TRUE(explicitReconciliation.has_check_status());
+  EXPECT_TRUE(explicitReconciliation.check_status().command().has_exit_code());
+  EXPECT_EQ(1, explicitReconciliation.check_status().command().exit_code());
+
+  acknowledge(&mesos, frameworkId, explicitReconciliation);
+
+  // Trigger implicit reconciliation.
+  reconcile(&mesos, frameworkId, {});
+
+  AWAIT_READY(updateImplicitReconciliation);
+  const v1::TaskStatus& implicitReconciliation =
+    updateImplicitReconciliation->status();
+
+  ASSERT_EQ(TASK_RUNNING, implicitReconciliation.state());
+  ASSERT_EQ(
+      v1::TaskStatus::REASON_RECONCILIATION,
+      implicitReconciliation.reason());
+  EXPECT_EQ(taskInfo.task_id(), implicitReconciliation.task_id());
+  EXPECT_TRUE(implicitReconciliation.has_check_status());
+  EXPECT_TRUE(implicitReconciliation.check_status().command().has_exit_code());
+  EXPECT_EQ(1, implicitReconciliation.check_status().command().exit_code());
+
+  // Cleanup all mesos launched containers.
+  Future<hashset<ContainerID>> containerIds = containerizer->containers();
+  AWAIT_READY(containerIds);
+
+  EXPECT_CALL(*scheduler, disconnected(_));
+
+  teardown(&mesos, frameworkId);
+
+  foreach (const ContainerID& containerId, containerIds.get()) {
+    AWAIT_READY(containerizer->wait(containerId));
+  }
+}
+
+
+// Verifies that a command check's status changes are delivered.
+//
+// TODO(alexr): When check mocking is available, ensure that *only*
+// status changes are delivered.
+//
+// TODO(gkleiman): Check if this test works on Windows.
+TEST_F_TEMP_DISABLED_ON_WINDOWS(
+    DefaultExecutorCheckTest,
+    CommandCheckStatusChange)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Disable AuthN on the agent.
+  slave::Flags flags = CreateSlaveFlags();
+  flags.authenticate_http_readwrite = false;
+
+  Fetcher fetcher;
+
+  // We have to explicitly create a `Containerizer` in non-local mode,
+  // because `LaunchNestedContainerSession` (used by command checks)
+  // tries to start a IO switchboard, which doesn't work in local mode yet.
+  Try<MesosContainerizer*> _containerizer =
+    MesosContainerizer::create(flags, false, &fetcher);
+
+  ASSERT_SOME(_containerizer);
+
+  Owned<slave::Containerizer> containerizer(_containerizer.get());
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> agent =
+    StartSlave(detector.get(), containerizer.get(), flags);
+  ASSERT_SOME(agent);
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+
+  const v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::ExecutorInfo executorInfo;
+  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
+  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
+  executorInfo.mutable_resources()->CopyFrom(resources);
+  executorInfo.mutable_shutdown_grace_period()->set_nanoseconds(
+      Seconds(10).ns());
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  subscribe(&mesos, frameworkInfo);
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  // Update `executorInfo` with the subscribed `frameworkId`.
+  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID agentId = offer.agent_id();
+
+  Future<Event::Update> updateTaskRunning;
+  Future<Event::Update> updateCheckResult;
+  Future<Event::Update> updateCheckResultChanged;
+  Future<Event::Update> updateCheckResultBack;
+
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&updateTaskRunning))
+    .WillOnce(FutureArg<1>(&updateCheckResult))
+    .WillOnce(FutureArg<1>(&updateCheckResultChanged))
+    .WillOnce(FutureArg<1>(&updateCheckResultBack))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(10000));
+
+  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
+  checkInfo->set_type(v1::CheckInfo::COMMAND);
+  checkInfo->set_delay_seconds(0);
+  checkInfo->set_interval_seconds(0);
+  checkInfo->mutable_command()->mutable_command()->set_value(
+      FLAPPING_CHECK_COMMAND(path::join(os::getcwd(), "XXXXXX")));
+
+  v1::TaskGroupInfo taskGroup;
+  taskGroup.add_tasks()->CopyFrom(taskInfo);
+
+  launchTaskGroup(&mesos, offer, executorInfo, taskGroup);
+
+  AWAIT_READY(updateTaskRunning);
+  ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
+  EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());
+
+  acknowledge(&mesos, frameworkId, updateTaskRunning->status());
+
+  AWAIT_READY(updateCheckResult);
+  const v1::TaskStatus& checkResult = updateCheckResult->status();
+
+  ASSERT_EQ(TASK_RUNNING, checkResult.state());
+  ASSERT_EQ(
+      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+      checkResult.reason());
+  EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
+  EXPECT_EQ(1, checkResult.check_status().command().exit_code());
+
+  acknowledge(&mesos, frameworkId, checkResult);
+
+  AWAIT_READY(updateCheckResultChanged);
+  const v1::TaskStatus& checkResultChanged = updateCheckResultChanged->status();
+
+  ASSERT_EQ(TASK_RUNNING, checkResultChanged.state());
+  ASSERT_EQ(
+      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+      checkResultChanged.reason());
+  EXPECT_TRUE(checkResultChanged.check_status().command().has_exit_code());
+  EXPECT_EQ(0, checkResultChanged.check_status().command().exit_code());
+
+  acknowledge(&mesos, frameworkId, checkResultChanged);
+
+  AWAIT_READY(updateCheckResultBack);
+  const v1::TaskStatus& checkResultBack = updateCheckResultBack->status();
+
+  ASSERT_EQ(TASK_RUNNING, checkResultBack.state());
+  ASSERT_EQ(
+      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+      checkResultBack.reason());
+  EXPECT_TRUE(checkResultBack.check_status().command().has_exit_code());
+  EXPECT_EQ(1, checkResultBack.check_status().command().exit_code());
+
+  // Cleanup all mesos launched containers.
+  Future<hashset<ContainerID>> containerIds = containerizer->containers();
+  AWAIT_READY(containerIds);
+
+  EXPECT_CALL(*scheduler, disconnected(_));
+
+  teardown(&mesos, frameworkId);
+
+  foreach (const ContainerID& containerId, containerIds.get()) {
+    AWAIT_READY(containerizer->wait(containerId));
+  }
+}
+
+
+// Verifies that when a command check times out after a successful check,
+// an empty check status update is delivered.
+//
+// TODO(gkleiman): Check if this test works on Windows.
+TEST_F_TEMP_DISABLED_ON_WINDOWS(DefaultExecutorCheckTest, CommandCheckTimeout)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Disable AuthN on the agent.
+  slave::Flags flags = CreateSlaveFlags();
+  flags.authenticate_http_readwrite = false;
+
+  Fetcher fetcher;
+
+  // We have to explicitly create a `Containerizer` in non-local mode,
+  // because `LaunchNestedContainerSession` (used by command checks)
+  // tries to start a IO switchboard, which doesn't work in local mode yet.
+  Try<MesosContainerizer*> _containerizer =
+    MesosContainerizer::create(flags, false, &fetcher);
+
+  ASSERT_SOME(_containerizer);
+
+  Owned<slave::Containerizer> containerizer(_containerizer.get());
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> agent =
+    StartSlave(detector.get(), containerizer.get(), flags);
+  ASSERT_SOME(agent);
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+
+  const v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::ExecutorInfo executorInfo;
+  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
+  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
+  executorInfo.mutable_resources()->CopyFrom(resources);
+  executorInfo.mutable_shutdown_grace_period()->set_nanoseconds(
+      Seconds(10).ns());
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  subscribe(&mesos, frameworkInfo);
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  // Update `executorInfo` with the subscribed `frameworkId`.
+  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID agentId = offer.agent_id();
+
+  Future<Event::Update> updateTaskRunning;
+  Future<Event::Update> updateCheckResult;
+  Future<Event::Update> updateCheckResultTimeout;
+
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&updateTaskRunning))
+    .WillOnce(FutureArg<1>(&updateCheckResult))
+    .WillOnce(FutureArg<1>(&updateCheckResultTimeout))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+  v1::TaskInfo taskInfo =
+    v1::createTask(agentId, resources, SLEEP_COMMAND(10000));
+
+  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
+  checkInfo->set_type(v1::CheckInfo::COMMAND);
+  checkInfo->set_delay_seconds(0);
+  checkInfo->set_interval_seconds(0);
+  checkInfo->set_timeout_seconds(1);
+  checkInfo->mutable_command()->mutable_command()->set_value(
+      STALLING_CHECK_COMMAND(path::join(os::getcwd(), "XXXXXX")));
+
+  v1::TaskGroupInfo taskGroup;
+  taskGroup.add_tasks()->CopyFrom(taskInfo);
+
+  launchTaskGroup(&mesos, offer, executorInfo, taskGroup);
+
+  AWAIT_READY(updateTaskRunning);
+  ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
+  EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());
+
+  acknowledge(&mesos, frameworkId, updateTaskRunning->status());
+
+  AWAIT_READY(updateCheckResult);
+  const v1::TaskStatus& checkResult = updateCheckResult->status();
+
+  ASSERT_EQ(TASK_RUNNING, checkResult.state());
+  ASSERT_EQ(
+      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+      checkResult.reason());
+  EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
+  EXPECT_EQ(1, checkResult.check_status().command().exit_code());
+
+  acknowledge(&mesos, frameworkId, checkResult);
+
+  AWAIT_READY(updateCheckResultTimeout);
+  const v1::TaskStatus& checkResultTimeout = updateCheckResultTimeout->status();
+
+  ASSERT_EQ(TASK_RUNNING, checkResultTimeout.state());
+  ASSERT_EQ(
+      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+      checkResultTimeout.reason());
+  EXPECT_FALSE(checkResultTimeout.check_status().command().has_exit_code());
+
+  // Cleanup all mesos launched containers.
+  Future<hashset<ContainerID>> containerIds = containerizer->containers();
+  AWAIT_READY(containerIds);
+
+  EXPECT_CALL(*scheduler, disconnected(_));
+
+  teardown(&mesos, frameworkId);
+
+  foreach (const ContainerID& containerId, containerIds.get()) {
+    AWAIT_READY(containerizer->wait(containerId));
+  }
+}
+
+
+// Verifies that when both command check and health check are specified,
+// health and check updates include both statuses. Also verifies that
+// both statuses are included upon reconciliation.
+//
+// TODO(gkleiman): Check if this test works on Windows.
+TEST_F(DefaultExecutorCheckTest, CommandCheckAndHealthCheckNoShadowing)
+{
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  // Disable AuthN on the agent.
+  slave::Flags flags = CreateSlaveFlags();
+  flags.authenticate_http_readwrite = false;
+
+  Fetcher fetcher;
+
+  // We have to explicitly create a `Containerizer` in non-local mode,
+  // because `LaunchNestedContainerSession` (used by command checks)
+  // tries to start a IO switchboard, which doesn't work in local mode yet.
+  Try<MesosContainerizer*> _containerizer =
+    MesosContainerizer::create(flags, false, &fetcher);
+
+  ASSERT_SOME(_containerizer);
+
+  Owned<slave::Containerizer> containerizer(_containerizer.get());
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Try<Owned<cluster::Slave>> agent =
+    StartSlave(detector.get(), containerizer.get(), flags);
+  ASSERT_SOME(agent);
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+
+  const v1::Resources resources =
+    v1::Resources::parse("cpus:0.1;mem:32;disk:32").get();
+
+  v1::ExecutorInfo executorInfo;
+  executorInfo.set_type(v1::ExecutorInfo::DEFAULT);
+  executorInfo.mutable_executor_id()->CopyFrom(v1::DEFAULT_EXECUTOR_ID);
+  executorInfo.mutable_resources()->CopyFrom(resources);
+  executorInfo.mutable_shutdown_grace_period()->set_nanoseconds(
+      Seconds(10).ns());
+
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected))
+    .WillRepeatedly(Return()); // Ignore teardown reconnections, see MESOS-6033.
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  Future<v1::scheduler::Event::Offers> offers;
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  subscribe(&mesos, frameworkInfo);
+
+  AWAIT_READY(subscribed);
+
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  // Update `executorInfo` with the subscribed `frameworkId`.
+  executorInfo.mutable_framework_id()->CopyFrom(frameworkId);
+
+  AWAIT_READY(offers);
+  EXPECT_NE(0, offers->offers().size());
+  const v1::Offer& offer = offers->offers(0);
+  const v1::AgentID agentId = offer.agent_id();
+
+  Future<Event::Update> updateTaskRunning;
+  Future<Event::Update> updateCheckResult;
+  Future<Event::Update> updateHealthResult;
+  Future<Event::Update> updateImplicitReconciliation;
+
+  EXPECT_CALL(*scheduler, update(_, _))
+    .WillOnce(FutureArg<1>(&updateTaskRunning))
+    .WillOnce(FutureArg<1>(&updateCheckResult))
+    .WillOnce(FutureArg<1>(&updateHealthResult))
+    .WillOnce(FutureArg<1>(&updateImplicitReconciliation))
+    .WillRepeatedly(Return()); // Ignore subsequent updates.
+
+  v1::TaskInfo taskInfo =
+      v1::createTask(agentId, resources, SLEEP_COMMAND(10000));
+
+  // Set both check and health check interval to an increased value to
+  // prevent a second update coming before reconciliation response.
+  int interval = 10;
+
+  v1::CheckInfo* checkInfo = taskInfo.mutable_check();
+  checkInfo->set_type(v1::CheckInfo::COMMAND);
+  checkInfo->set_delay_seconds(0);
+  checkInfo->set_interval_seconds(interval);
+  checkInfo->mutable_command()->mutable_command()->set_value("exit 1");
+
+  // Delay health check for 1s to ensure health update comes after check update.
+  //
+  // TODO(alexr): This can lead to flakiness on busy agents. A more robust
+  // approach could be setting the grace period to MAX_INT, and make the
+  // health check pass iff a file created by the check exists. Alternatively,
+  // we can relax the expectation that the check update is delivered first.
+  v1::HealthCheck* healthCheckInfo = taskInfo.mutable_health_check();
+  healthCheckInfo->set_type(v1::HealthCheck::COMMAND);
+  healthCheckInfo->set_delay_seconds(1);
+  healthCheckInfo->set_interval_seconds(interval);
+  healthCheckInfo->mutable_command()->set_value("exit 0");
+
+  launchTask(&mesos, offer, taskInfo);
+
+  AWAIT_READY(updateTaskRunning);
+  ASSERT_EQ(TASK_RUNNING, updateTaskRunning->status().state());
+  EXPECT_EQ(taskInfo.task_id(), updateTaskRunning->status().task_id());
+
+  acknowledge(&mesos, frameworkId, updateTaskRunning->status());
+
+  AWAIT_READY(updateCheckResult);
+  const v1::TaskStatus& checkResult = updateCheckResult->status();
+
+  ASSERT_EQ(TASK_RUNNING, checkResult.state());
+  ASSERT_EQ(
+      v1::TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+      checkResult.reason());
+  EXPECT_EQ(taskInfo.task_id(), checkResult.task_id());
+  EXPECT_FALSE(checkResult.has_healthy());
+  EXPECT_TRUE(checkResult.has_check_status());
+  EXPECT_TRUE(checkResult.check_status().command().has_exit_code());
+  EXPECT_EQ(1, checkResult.check_status().command().exit_code());
+
+  acknowledge(&mesos, frameworkId, checkResult);
+
+  AWAIT_READY(updateHealthResult);
+  const v1::TaskStatus& healthResult = updateHealthResult->status();
+
+  ASSERT_EQ(TASK_RUNNING, healthResult.state());
+  EXPECT_EQ(taskInfo.task_id(), healthResult.task_id());
+  EXPECT_TRUE(healthResult.has_healthy());
+  EXPECT_TRUE(healthResult.healthy());
+  EXPECT_TRUE(healthResult.has_check_status());
+  EXPECT_TRUE(healthResult.check_status().command().has_exit_code());
+  EXPECT_EQ(1, healthResult.check_status().command().exit_code());
+
+  acknowledge(&mesos, frameworkId, healthResult);
+
+  // Trigger implicit reconciliation.
+  reconcile(&mesos, frameworkId, {});
+
+  AWAIT_READY(updateImplicitReconciliation);
+  const v1::TaskStatus& implicitReconciliation =
+    updateImplicitReconciliation->status();
+
+  ASSERT_EQ(TASK_RUNNING, implicitReconciliation.state());
+  ASSERT_EQ(
+      v1::TaskStatus::REASON_RECONCILIATION,
+      implicitReconciliation.reason());
+  EXPECT_EQ(taskInfo.task_id(), implicitReconciliation.task_id());
+  EXPECT_TRUE(implicitReconciliation.has_healthy());
+  EXPECT_TRUE(implicitReconciliation.healthy());
+  EXPECT_TRUE(implicitReconciliation.has_check_status());
+  EXPECT_TRUE(implicitReconciliation.check_status().command().has_exit_code());
+  EXPECT_EQ(1, implicitReconciliation.check_status().command().exit_code());
+
+  // Cleanup all mesos launched containers.
+  Future<hashset<ContainerID>> containerIds = containerizer->containers();
+  AWAIT_READY(containerIds);
+
+  EXPECT_CALL(*scheduler, disconnected(_));
+
+  teardown(&mesos, frameworkId);
+
+  foreach (const ContainerID& containerId, containerIds.get()) {
+    AWAIT_READY(containerizer->wait(containerId));
+  }
+}
+
 
 // Verifies that an HTTP check is supported by the default executor and
 // its status is delivered in a task status update.


[2/4] mesos git commit: Enabled pause/resume for general checks.

Posted by al...@apache.org.
Enabled pause/resume for general checks.

Review: https://reviews.apache.org/r/57912/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3a689ab5
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3a689ab5
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3a689ab5

Branch: refs/heads/master
Commit: 3a689ab552a9ff23dd912e0178d3c5af393f7e84
Parents: 5c9ce37
Author: Gast�n Kleiman <ga...@mesosphere.io>
Authored: Thu Mar 30 13:41:14 2017 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Thu Mar 30 19:34:24 2017 +0200

----------------------------------------------------------------------
 src/checks/checker.cpp            | 52 ++++++++++++++++++++++++++++++++--
 src/checks/checker.hpp            |  7 ++---
 src/launcher/default_executor.cpp | 22 +++++++++-----
 src/launcher/executor.cpp         |  4 +--
 4 files changed, 69 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3a689ab5/src/checks/checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.cpp b/src/checks/checker.cpp
index 1664acd..d1e9083 100644
--- a/src/checks/checker.cpp
+++ b/src/checks/checker.cpp
@@ -127,6 +127,9 @@ public:
       const Option<pid_t>& _taskPid,
       const std::vector<std::string>& _namespaces);
 
+  void pause();
+  void resume();
+
   virtual ~CheckerProcess() {}
 
 protected:
@@ -167,6 +170,7 @@ private:
   Option<lambda::function<pid_t(const lambda::function<int()>&)>> clone;
 
   CheckStatusInfo previousCheckStatus;
+  bool paused;
 };
 
 
@@ -208,9 +212,15 @@ Checker::~Checker()
 }
 
 
-void Checker::stop()
+void Checker::pause()
+{
+  dispatch(process.get(), &CheckerProcess::pause);
+}
+
+
+void Checker::resume()
 {
-  terminate(process.get(), true);
+  dispatch(process.get(), &CheckerProcess::resume);
 }
 
 
@@ -225,7 +235,8 @@ CheckerProcess::CheckerProcess(
     updateCallback(_callback),
     taskId(_taskId),
     taskPid(_taskPid),
-    namespaces(_namespaces)
+    namespaces(_namespaces),
+    paused(false)
 {
   Try<Duration> create = Duration::create(check.delay_seconds());
   CHECK_SOME(create);
@@ -286,6 +297,10 @@ void CheckerProcess::finalize()
 
 void CheckerProcess::performCheck()
 {
+  if (paused) {
+    return;
+  }
+
   Stopwatch stopwatch;
   stopwatch.start();
 
@@ -314,16 +329,47 @@ void CheckerProcess::performCheck()
 
 void CheckerProcess::scheduleNext(const Duration& duration)
 {
+  CHECK(!paused);
+
   VLOG(1) << "Scheduling check for task '" << taskId << "' in " << duration;
 
   delay(duration, self(), &Self::performCheck);
 }
 
 
+void CheckerProcess::pause()
+{
+  if (!paused) {
+    VLOG(1) << "Checking for task '" << taskId << "' paused";
+
+    paused = true;
+  }
+}
+
+
+void CheckerProcess::resume()
+{
+  if (paused) {
+    VLOG(1) << "Checking for task '" << taskId << "' resumed";
+
+    paused = false;
+
+    // Schedule a check immediately.
+    scheduleNext(Duration::zero());
+  }
+}
+
 void CheckerProcess::processCheckResult(
     const Stopwatch& stopwatch,
     const CheckStatusInfo& result)
 {
+  // `Checker` might have been paused while performing the check.
+  if (paused) {
+    LOG(INFO) << "Ignoring " << check.type() << " check result for"
+              << " task '" << taskId << "': checking is paused";
+    return;
+  }
+
   VLOG(1) << "Performed " << check.type() << " check"
           << " for task '" << taskId << "' in " << stopwatch.elapsed();
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3a689ab5/src/checks/checker.hpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.hpp b/src/checks/checker.hpp
index e8af316..1521b9c 100644
--- a/src/checks/checker.hpp
+++ b/src/checks/checker.hpp
@@ -68,10 +68,9 @@ public:
   Checker(const Checker&) = delete;
   Checker& operator=(const Checker&) = delete;
 
-  /**
-   * Immediately stops checking. Any in-flight checks are dropped.
-   */
-  void stop();
+  // Idempotent helpers for pausing and resuming checking.
+  void pause();
+  void resume();
 
 private:
   explicit Checker(process::Owned<CheckerProcess> process);

http://git-wip-us.apache.org/repos/asf/mesos/blob/3a689ab5/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index 606fd9c..79785fc 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -163,8 +163,12 @@ public:
       }
     }
 
-    // Pause all health checks.
+    // Pause all checks and health checks.
     foreachvalue (Owned<Container> container, containers) {
+      if (container->checker.isSome()) {
+        container->checker->get()->pause();
+      }
+
       if (container->healthChecker.isSome()) {
         container->healthChecker->get()->pause();
       }
@@ -193,8 +197,12 @@ public:
           wait(containers.keys());
         }
 
-        // Resume all health checks.
+        // Resume all checks and health checks.
         foreachvalue (Owned<Container> container, containers) {
+          if (container->checker.isSome()) {
+            container->checker->get()->resume();
+          }
+
           if (container->healthChecker.isSome()) {
             container->healthChecker->get()->resume();
           }
@@ -738,11 +746,11 @@ protected:
       deserialize<agent::Response>(contentType, response->body);
     CHECK_SOME(waitResponse);
 
-    // If there is an associated checker with the task, stop it to
-    // avoid sending check updates after a terminal status update.
+    // If the task is checked, pause the associated checker to avoid
+    // sending check updates after a terminal status update.
     if (container->checker.isSome()) {
       CHECK_NOTNULL(container->checker->get());
-      container->checker->get()->stop();
+      container->checker->get()->pause();
       container->checker = None();
     }
 
@@ -931,13 +939,13 @@ protected:
     CHECK(!container->killing);
     container->killing = true;
 
-    // If the task is checked, stop the associated checker.
+    // If the task is checked, pause the associated checker.
     //
     // TODO(alexr): Once we support `TASK_KILLING` in this executor,
     // consider continuing checking the task after sending `TASK_KILLING`.
     if (container->checker.isSome()) {
       CHECK_NOTNULL(container->checker->get());
-      container->checker->get()->stop();
+      container->checker->get()->pause();
       container->checker = None();
     }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3a689ab5/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 8bd266e..bc69beb 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -731,7 +731,7 @@ private:
 
       // Stop checking the task.
       if (checker.get() != nullptr) {
-        checker->stop();
+        checker->pause();
       }
 
       // Stop health checking the task.
@@ -776,7 +776,7 @@ private:
 
     // Stop checking the task.
     if (checker.get() != nullptr) {
-      checker->stop();
+      checker->pause();
     }
 
     // Stop health checking the task.


[4/4] mesos git commit: Improved log/failure messages in the (health)checker libraries.

Posted by al...@apache.org.
Improved log/failure messages in the (health)checker libraries.

- Make log/failure messages consistent across both libraries.
- Task and container IDs are user generated and can contain spaces, so
  we have to wrap them in single quotes.
- Remove the redundant task IDs from 'Failure' messages.

Review: https://reviews.apache.org/r/57854/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5c9ce378
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5c9ce378
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5c9ce378

Branch: refs/heads/master
Commit: 5c9ce378f627cbd4c2ed16f1e342dde16d5ee939
Parents: 080e1b7
Author: Gast�n Kleiman <ga...@mesosphere.io>
Authored: Thu Mar 30 12:58:38 2017 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Thu Mar 30 19:34:24 2017 +0200

----------------------------------------------------------------------
 src/checks/checker.cpp        |  60 +++++++-------
 src/checks/health_checker.cpp | 166 ++++++++++++++++++++-----------------
 src/tests/check_tests.cpp     |   6 +-
 3 files changed, 121 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5c9ce378/src/checks/checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/checker.cpp b/src/checks/checker.cpp
index 3f2d8d8..1664acd 100644
--- a/src/checks/checker.cpp
+++ b/src/checks/checker.cpp
@@ -102,13 +102,12 @@ static pid_t cloneWithSetns(
         Try<Nothing> setns = ns::setns(taskPid.get(), ns);
         if (setns.isError()) {
           // This effectively aborts the check.
-          LOG(FATAL) << "Failed to enter the " << ns << " namespace of "
-                     << "task (pid: '" << taskPid.get() << "'): "
-                     << setns.error();
+          LOG(FATAL) << "Failed to enter the " << ns << " namespace of task"
+                     << " (pid: " << taskPid.get() << "): " << setns.error();
         }
 
-        VLOG(1) << "Entered the " << ns << " namespace of "
-                << "task (pid: '" << taskPid.get() << "') successfully";
+        VLOG(1) << "Entered the " << ns << " namespace of task"
+                << " (pid: " << taskPid.get() << ") successfully";
       }
     }
 
@@ -272,7 +271,7 @@ CheckerProcess::CheckerProcess(
 
 void CheckerProcess::initialize()
 {
-  VLOG(1) << "Check configuration for task " << taskId << ":"
+  VLOG(1) << "Check configuration for task '" << taskId << "':"
           << " '" << jsonify(JSON::Protobuf(check)) << "'";
 
   scheduleNext(checkDelay);
@@ -281,7 +280,7 @@ void CheckerProcess::initialize()
 
 void CheckerProcess::finalize()
 {
-  LOG(INFO) << "Checking for task " << taskId << " stopped";
+  LOG(INFO) << "Checking for task '" << taskId << "' stopped";
 }
 
 
@@ -315,7 +314,7 @@ void CheckerProcess::performCheck()
 
 void CheckerProcess::scheduleNext(const Duration& duration)
 {
-  VLOG(1) << "Scheduling check for task " << taskId << " in " << duration;
+  VLOG(1) << "Scheduling check for task '" << taskId << "' in " << duration;
 
   delay(duration, self(), &Self::performCheck);
 }
@@ -325,8 +324,8 @@ void CheckerProcess::processCheckResult(
     const Stopwatch& stopwatch,
     const CheckStatusInfo& result)
 {
-  VLOG(1) << "Performed " << check.type() << " check for task " << taskId
-          << " in " << stopwatch.elapsed();
+  VLOG(1) << "Performed " << check.type() << " check"
+          << " for task '" << taskId << "' in " << stopwatch.elapsed();
 
   // Trigger the callback if check info changes.
   if (result != previousCheckStatus) {
@@ -360,8 +359,8 @@ Future<int> CheckerProcess::commandCheck()
 
   if (command.shell()) {
     // Use the shell variant.
-    VLOG(1) << "Launching command check '" << command.value() << "'"
-            << " for task " << taskId;
+    VLOG(1) << "Launching COMMAND check '" << command.value() << "'"
+            << " for task '" << taskId << "'";
 
     s = process::subprocess(
         command.value(),
@@ -375,8 +374,8 @@ Future<int> CheckerProcess::commandCheck()
     vector<string> argv(
         std::begin(command.arguments()), std::end(command.arguments()));
 
-    VLOG(1) << "Launching command check [" << command.value() << ", "
-            << strings::join(", ", argv) << "] for task " << taskId;
+    VLOG(1) << "Launching COMMAND check [" << command.value() << ", "
+            << strings::join(", ", argv) << "] for task '" << taskId << "'";
 
     s = process::subprocess(
         command.value(),
@@ -407,14 +406,13 @@ Future<int> CheckerProcess::commandCheck()
 
       if (commandPid != -1) {
         // Cleanup the external command process.
-        VLOG(1) << "Killing the command check process " << commandPid
-                << " for task " << _taskId;
+        VLOG(1) << "Killing the COMMAND check process '" << commandPid
+                << "' for task '" << _taskId << "'";
 
         os::killtree(commandPid, SIGKILL);
       }
 
-      return Failure(
-          "Command timed out after " + stringify(timeout) + "; aborting");
+      return Failure("Command timed out after " + stringify(timeout));
     })
     .then([](const Option<int>& exitCode) -> Future<int> {
       if (exitCode.isNone()) {
@@ -441,15 +439,16 @@ void CheckerProcess::processCommandCheckResult(
   // see MESOS-7242.
   if (result.isReady() && WIFEXITED(result.get())) {
     const int exitCode = WEXITSTATUS(result.get());
-    VLOG(1) << check.type() << " check for task "
-            << taskId << " returned " << exitCode;
+    VLOG(1) << check.type() << " check for task '"
+            << taskId << "' returned: " << exitCode;
 
     checkStatusInfo.mutable_command()->set_exit_code(
         static_cast<int32_t>(exitCode));
   } else {
     // Check's status is currently not available, which may indicate a change
     // that should be reported as an empty `CheckStatusInfo.Command` message.
-    LOG(WARNING) << "Check for task " << taskId << " failed: "
+    LOG(WARNING) << check.type() << " check for task '" << taskId
+                 << "' failed: "
                  << (result.isFailed() ? result.failure() : "discarded");
 
     checkStatusInfo.mutable_command();
@@ -471,7 +470,7 @@ Future<int> CheckerProcess::httpCheck()
   const string url = scheme + "://" + DEFAULT_DOMAIN + ":" +
                      stringify(http.port()) + path;
 
-  VLOG(1) << "Launching HTTP check '" << url << "' for task " << taskId;
+  VLOG(1) << "Launching HTTP check '" << url << "' for task '" << taskId << "'";
 
   const vector<string> argv = {
     HTTP_CHECK_COMMAND,
@@ -522,14 +521,14 @@ Future<int> CheckerProcess::httpCheck()
       if (curlPid != -1) {
         // Cleanup the HTTP_CHECK_COMMAND process.
         VLOG(1) << "Killing the HTTP check process " << curlPid
-                << " for task " << _taskId;
+                << " for task '" << _taskId << "'";
 
         os::killtree(curlPid, SIGKILL);
       }
 
       return Failure(
           string(HTTP_CHECK_COMMAND) + " timed out after " +
-          stringify(timeout) + "; aborting");
+          stringify(timeout));
     })
     .then(defer(self(), &Self::_httpCheck, lambda::_1));
 }
@@ -595,15 +594,15 @@ void CheckerProcess::processHttpCheckResult(
   checkStatusInfo.set_type(check.type());
 
   if (result.isReady()) {
-    VLOG(1) << check.type() << " check for task "
-            << taskId << " returned " << result.get();
+    VLOG(1) << check.type() << " check for task '"
+            << taskId << "' returned: " << result.get();
 
     checkStatusInfo.mutable_http()->set_status_code(
         static_cast<uint32_t>(result.get()));
   } else {
     // Check's status is currently not available, which may indicate a change
     // that should be reported as an empty `CheckStatusInfo.Http` message.
-    LOG(WARNING) << "Check for task " << taskId << " failed: "
+    LOG(WARNING) << "Check for task '" << taskId << "' failed: "
                  << (result.isFailed() ? result.failure() : "discarded");
 
     checkStatusInfo.mutable_http();
@@ -623,7 +622,7 @@ Option<Error> checkInfo(const CheckInfo& checkInfo)
   switch (checkInfo.type()) {
     case CheckInfo::COMMAND: {
       if (!checkInfo.has_command()) {
-        return Error("Expecting 'command' to be set for command check");
+        return Error("Expecting 'command' to be set for COMMAND check");
       }
 
       const CommandInfo& command = checkInfo.command().command();
@@ -656,8 +655,7 @@ Option<Error> checkInfo(const CheckInfo& checkInfo)
 
       if (http.has_path() && !strings::startsWith(http.path(), '/')) {
         return Error(
-            "The path '" + http.path() +
-            "' of HTTP  check must start with '/'");
+            "The path '" + http.path() + "' of HTTP check must start with '/'");
       }
 
       break;
@@ -696,7 +694,7 @@ Option<Error> checkStatusInfo(const CheckStatusInfo& checkStatusInfo)
     case CheckInfo::COMMAND: {
       if (!checkStatusInfo.has_command()) {
         return Error(
-            "Expecting 'command' to be set for command check's status");
+            "Expecting 'command' to be set for COMMAND check's status");
       }
       break;
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/5c9ce378/src/checks/health_checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/health_checker.cpp b/src/checks/health_checker.cpp
index b768c5b..769278c 100644
--- a/src/checks/health_checker.cpp
+++ b/src/checks/health_checker.cpp
@@ -116,13 +116,12 @@ static pid_t cloneWithSetns(
         Try<Nothing> setns = ns::setns(taskPid.get(), ns);
         if (setns.isError()) {
           // This effectively aborts the health check.
-          LOG(FATAL) << "Failed to enter the " << ns << " namespace of "
-                     << "task (pid: '" << taskPid.get() << "'): "
-                     << setns.error();
+          LOG(FATAL) << "Failed to enter the " << ns << " namespace of task"
+                     << " (pid: '" << taskPid.get() << "'): " << setns.error();
         }
 
-        VLOG(1) << "Entered the " << ns << " namespace of "
-                << "task (pid: '" << taskPid.get() << "') successfully";
+        VLOG(1) << "Entered the " << ns << " namespace of task"
+                << " (pid: '" << taskPid.get() << "') successfully";
       }
     }
 
@@ -269,7 +268,7 @@ HealthCheckerProcess::HealthCheckerProcess(
 
 void HealthCheckerProcess::initialize()
 {
-  VLOG(1) << "Health check configuration:"
+  VLOG(1) << "Health check configuration for task '" << taskId << "':"
           << " '" << jsonify(JSON::Protobuf(check)) << "'";
 
   startTime = Clock::now();
@@ -283,14 +282,17 @@ void HealthCheckerProcess::failure(const string& message)
   if (initializing &&
       checkGracePeriod.secs() > 0 &&
       (Clock::now() - startTime) <= checkGracePeriod) {
-    LOG(INFO) << "Ignoring failure as health check still in grace period";
+    LOG(INFO) << "Ignoring failure of "
+              << HealthCheck::Type_Name(check.type()) << " health check for"
+              << " task '" << taskId << "': still in grace period";
     scheduleNext(checkInterval);
     return;
   }
 
   consecutiveFailures++;
-  LOG(WARNING) << "Health check failed " << consecutiveFailures
-               << " times consecutively: " << message;
+  LOG(WARNING) << HealthCheck::Type_Name(check.type())
+               << " health check for task '" << taskId << "' failed "
+               << consecutiveFailures << " times consecutively: " << message;
 
   bool killTask = consecutiveFailures >= check.consecutive_failures();
 
@@ -314,7 +316,8 @@ void HealthCheckerProcess::failure(const string& message)
 
 void HealthCheckerProcess::success()
 {
-  VLOG(1) << HealthCheck::Type_Name(check.type()) << " health check passed";
+  VLOG(1) << HealthCheck::Type_Name(check.type()) << " health check for task '"
+          << taskId << "' passed";
 
   // Send a healthy status update on the first success,
   // and on the first success following failure(s).
@@ -376,20 +379,22 @@ void HealthCheckerProcess::processCheckResult(
 {
   // `HealthChecker` might have been paused while performing the check.
   if (paused) {
-    LOG(INFO) << "Ignoring health check result of task " + stringify(taskId) +
-                 " (health checking is paused)";
+    LOG(INFO) << "Ignoring " << HealthCheck::Type_Name(check.type())
+              << " health check result for task '" << taskId
+              << "': health checking is paused";
     return;
   }
 
   if (future.isDiscarded()) {
-    LOG(INFO) << HealthCheck::Type_Name(check.type()) +
-                 " health check of task " + stringify(taskId) + " discarded";
+    LOG(INFO) << HealthCheck::Type_Name(check.type()) << " health check for"
+              << " task '" << taskId << "' discarded";
     scheduleNext(checkInterval);
     return;
   }
 
   VLOG(1) << "Performed " << HealthCheck::Type_Name(check.type())
-          << " health check in " << stopwatch.elapsed();
+          << " health check for task '" << taskId << "' in "
+          << stopwatch.elapsed();
 
   if (future.isReady()) {
     success();
@@ -397,7 +402,8 @@ void HealthCheckerProcess::processCheckResult(
   }
 
   string message = HealthCheck::Type_Name(check.type()) +
-                   " health check failed: " + future.failure();
+                   " health check for task '" + stringify(taskId) +
+                   "' failed: " + future.failure();
 
   failure(message);
 }
@@ -422,7 +428,8 @@ Future<Nothing> HealthCheckerProcess::commandHealthCheck()
 
   if (command.shell()) {
     // Use the shell variant.
-    VLOG(1) << "Launching command health check '" << command.value() << "'";
+    VLOG(1) << "Launching COMMAND health check '" << command.value() << "'"
+            << " for task '" << taskId << "'";
 
     external = subprocess(
         command.value(),
@@ -438,8 +445,8 @@ Future<Nothing> HealthCheckerProcess::commandHealthCheck()
       argv.push_back(arg);
     }
 
-    VLOG(1) << "Launching command health check [" << command.value() << ", "
-            << strings::join(", ", argv) << "]";
+    VLOG(1) << "Launching COMMAND health check [" << command.value() << ", "
+            << strings::join(", ", argv) << "] for task '" << taskId << "'";
 
     external = subprocess(
         command.value(),
@@ -456,24 +463,27 @@ Future<Nothing> HealthCheckerProcess::commandHealthCheck()
     return Failure("Failed to create subprocess: " + external.error());
   }
 
+  // TODO(alexr): Use lambda named captures for
+  // these cached values once they are available.
   pid_t commandPid = external->pid();
   const Duration timeout = checkTimeout;
+  const TaskID _taskId = taskId;
 
   return external->status()
     .after(
         timeout,
-        [timeout, commandPid](Future<Option<int>> future) {
+        [timeout, commandPid, _taskId](Future<Option<int>> future) {
       future.discard();
 
       if (commandPid != -1) {
         // Cleanup the external command process.
-        VLOG(1) << "Killing the command health check process " << commandPid;
+        VLOG(1) << "Killing the COMMAND health check process '" << commandPid
+                << "' for task '" << _taskId << "'";
 
         os::killtree(commandPid, SIGKILL);
       }
 
-      return Failure(
-          "Command timed out after " + stringify(timeout) + "; aborting");
+      return Failure("Command timed out after " + stringify(timeout));
     })
     .then([](const Option<int>& status) -> Future<Nothing> {
       if (status.isNone()) {
@@ -482,7 +492,7 @@ Future<Nothing> HealthCheckerProcess::commandHealthCheck()
 
       int statusCode = status.get();
       if (statusCode != 0) {
-        return Failure("Command returned " + WSTRINGIFY(statusCode));
+        return Failure("Command returned: " + WSTRINGIFY(statusCode));
       }
 
       return Nothing();
@@ -497,7 +507,7 @@ Future<Nothing> HealthCheckerProcess::nestedCommandHealthCheck()
   CHECK(check.has_command());
   CHECK_SOME(agentURL);
 
-  VLOG(1) << "Launching command health check of task " << stringify(taskId);
+  VLOG(1) << "Launching COMMAND health check for task '" << taskId << "'";
 
   // We don't want recoverable errors, e.g., the agent responding with
   // HTTP status code 503, to trigger a health check failure.
@@ -526,11 +536,12 @@ Future<Nothing> HealthCheckerProcess::nestedCommandHealthCheck()
                        {"Content-Type", stringify(ContentType::PROTOBUF)}};
 
     process::http::request(request, false)
-      .onFailed(defer(self(), [this, promise](const string& failure) {
-        LOG(WARNING) << "Connection to remove the nested container "
-                     << stringify(previousCheckContainerId.get())
-                     << " used for the command health check of task "
-                     << stringify(taskId) << " failed: " << failure;
+      .onFailed(defer(self(),
+                      [this, promise](const string& failure) {
+        LOG(WARNING) << "Connection to remove the nested container '"
+                     << previousCheckContainerId.get()
+                     << "' used for the COMMAND health check for task '"
+                     << taskId << "' failed: " << failure;
 
         // Something went wrong while sending the request, we treat this
         // as a transient failure and discard the promise.
@@ -539,14 +550,12 @@ Future<Nothing> HealthCheckerProcess::nestedCommandHealthCheck()
       .onReady(defer(self(), [this, promise](const Response& response) {
         if (response.code != process::http::Status::OK) {
           // The agent was unable to remove the health check container,
-          // we treat this as a transient failure and discard the
-          // promise.
+          // we treat this as a transient failure and discard the promise.
           LOG(WARNING) << "Received '" << response.status << "' ("
-                       << response.body
-                       << ") while removing the nested container "
-                       << stringify(previousCheckContainerId.get())
-                       << " used for the COMMAND health check for task"
-                       << stringify(taskId);
+                       << response.body << ") while removing the nested"
+                       << " container '" << previousCheckContainerId.get()
+                       << "' used for the COMMAND health check for task '"
+                       << taskId << "'";
 
           promise->discard();
         }
@@ -612,8 +621,8 @@ void HealthCheckerProcess::__nestedCommandHealthCheck(
                      {"Message-Accept", stringify(ContentType::PROTOBUF)},
                      {"Content-Type", stringify(ContentType::PROTOBUF)}};
 
-  // TODO(alexr): Use lambda named captures for
-  // these cached values once they are available.
+  // TODO(alexr): Use a lambda named capture for
+  // this cached value once it is available.
   const Duration timeout = checkTimeout;
 
   auto checkTimedOut = std::make_shared<bool>(false);
@@ -637,8 +646,7 @@ void HealthCheckerProcess::__nestedCommandHealthCheck(
 
       *checkTimedOut = true;
 
-      return Failure(
-          "Command timed out after " + stringify(timeout) + "; aborting");
+      return Failure("Command timed out after " + stringify(timeout));
     }))
     .onFailed(defer(self(),
                     &Self::nestedCommandHealthCheckFailure,
@@ -664,27 +672,21 @@ void HealthCheckerProcess::___nestedCommandHealthCheck(
     // The agent was unable to launch the health check container, we
     // treat this as a transient failure.
     LOG(WARNING) << "Received '" << launchResponse.status << "' ("
-                 << launchResponse.body << ") while launching command health "
-                 << "check of task " << stringify(taskId);
+                 << launchResponse.body << ") while launching COMMAND health"
+                 << " check for task '" << taskId << "'";
 
     promise->discard();
     return;
   }
 
-  // We need to make a copy so that the lambdas can capture it.
-  const TaskID _taskId = taskId;
-
   waitNestedContainer(checkContainerId)
-    .onFailed([_taskId, promise](const string& failure) {
+    .onFailed([promise](const string& failure) {
       promise->fail(
-          "Unable to get the exit code of command health check of task " +
-          stringify(_taskId) + ": " + failure);
+          "Unable to get the exit code: " + failure);
     })
-    .onReady([_taskId, promise](const Option<int>& status) -> void {
+    .onReady([promise](const Option<int>& status) -> void {
       if (status.isNone()) {
-        promise->fail(
-            "Unable to get the exit code of command health check of task " +
-            stringify(_taskId));
+        promise->fail("Unable to get the exit code");
       // TODO(gkleiman): Make sure that the following block works on Windows.
       } else if (WIFSIGNALED(status.get()) &&
                  WTERMSIG(status.get()) == SIGKILL) {
@@ -693,9 +695,7 @@ void HealthCheckerProcess::___nestedCommandHealthCheck(
         // the result.
         promise->discard();
       } else if (status.get() != 0) {
-        promise->fail(
-            "Command health check of task " + stringify(_taskId) +
-            " returned " + WSTRINGIFY(status.get()));
+        promise->fail("Command returned: " + WSTRINGIFY(status.get()));
       } else {
         promise->set(Nothing());
       }
@@ -769,10 +769,10 @@ Future<Option<int>> HealthCheckerProcess::waitNestedContainer(
                      {"Content-Type", stringify(ContentType::PROTOBUF)}};
 
   return process::http::request(request, false)
-    .repair([this](const Future<Response>& future) {
+    .repair([containerId](const Future<Response>& future) {
       return Failure(
-          "Connection to wait for a health check of task " +
-          stringify(taskId) + " failed: " + future.failure());
+          "Connection to wait for health check container '" +
+          stringify(containerId) + "' failed: " + future.failure());
     })
     .then(defer(self(),
                 &Self::_waitNestedContainer, containerId, lambda::_1));
@@ -786,7 +786,8 @@ Future<Option<int>> HealthCheckerProcess::_waitNestedContainer(
   if (httpResponse.code != process::http::Status::OK) {
     return Failure(
         "Received '" + httpResponse.status + "' (" + httpResponse.body +
-        ") while waiting on health check of task " + stringify(taskId));
+        ") while waiting on health check container '" +
+        stringify(containerId) + "'");
   }
 
   Try<agent::Response> response =
@@ -814,7 +815,8 @@ Future<Nothing> HealthCheckerProcess::httpHealthCheck()
   const string url = scheme + "://" + DEFAULT_DOMAIN + ":" +
                      stringify(http.port()) + path;
 
-  VLOG(1) << "Launching HTTP health check '" << url << "'";
+  VLOG(1) << "Launching HTTP health check '" << url << "'"
+          << " for task '" << taskId << "'";
 
   const vector<string> argv = {
     HTTP_CHECK_COMMAND,
@@ -845,8 +847,11 @@ Future<Nothing> HealthCheckerProcess::httpHealthCheck()
         " subprocess: " + s.error());
   }
 
+  // TODO(alexr): Use lambda named captures for
+  // these cached values once they are available.
   pid_t curlPid = s->pid();
   const Duration timeout = checkTimeout;
+  const TaskID _taskId = taskId;
 
   return await(
       s->status(),
@@ -854,21 +859,22 @@ Future<Nothing> HealthCheckerProcess::httpHealthCheck()
       process::io::read(s->err().get()))
     .after(
         timeout,
-        [timeout, curlPid](Future<tuple<Future<Option<int>>,
-                                        Future<string>,
-                                        Future<string>>> future) {
+        [timeout, curlPid, _taskId](Future<tuple<Future<Option<int>>,
+                                                 Future<string>,
+                                                 Future<string>>> future) {
       future.discard();
 
       if (curlPid != -1) {
         // Cleanup the HTTP_CHECK_COMMAND process.
-        VLOG(1) << "Killing the HTTP health check process " << curlPid;
+        VLOG(1) << "Killing the HTTP health check process '" << curlPid
+                << "' for task '" << _taskId << "'";
 
         os::killtree(curlPid, SIGKILL);
       }
 
       return Failure(
           string(HTTP_CHECK_COMMAND) + " timed out after " +
-          stringify(timeout) + "; aborting");
+          stringify(timeout));
     })
     .then(defer(self(), &Self::_httpHealthCheck, lambda::_1));
 }
@@ -943,7 +949,8 @@ Future<Nothing> HealthCheckerProcess::tcpHealthCheck()
 
   const HealthCheck::TCPCheckInfo& tcp = check.tcp();
 
-  VLOG(1) << "Launching TCP health check at port '" << tcp.port() << "'";
+  VLOG(1) << "Launching TCP health check for task '" << taskId << "' at port"
+          << tcp.port();
 
   const string tcpConnectPath = path::join(launcherDir, TCP_CHECK_COMMAND);
 
@@ -971,8 +978,11 @@ Future<Nothing> HealthCheckerProcess::tcpHealthCheck()
         " subprocess: " + s.error());
   }
 
+  // TODO(alexr): Use lambda named captures for
+  // these cached values once they are available.
   pid_t tcpConnectPid = s->pid();
   const Duration timeout = checkTimeout;
+  const TaskID _taskId = taskId;
 
   return await(
       s->status(),
@@ -980,21 +990,22 @@ Future<Nothing> HealthCheckerProcess::tcpHealthCheck()
       process::io::read(s->err().get()))
     .after(
         timeout,
-        [timeout, tcpConnectPid](Future<tuple<Future<Option<int>>,
-                                              Future<string>,
-                                              Future<string>>> future) {
+        [timeout, tcpConnectPid, _taskId](Future<tuple<Future<Option<int>>,
+                                                       Future<string>,
+                                                       Future<string>>> future)
+    {
       future.discard();
 
       if (tcpConnectPid != -1) {
         // Cleanup the TCP_CHECK_COMMAND process.
-        VLOG(1) << "Killing the TCP health check process " << tcpConnectPid;
+        VLOG(1) << "Killing the TCP health check process " << tcpConnectPid
+                << " for task '" << _taskId << "'";
 
         os::killtree(tcpConnectPid, SIGKILL);
       }
 
       return Failure(
-          string(TCP_CHECK_COMMAND) + " timed out after " +
-          stringify(timeout) + "; aborting");
+          string(TCP_CHECK_COMMAND) + " timed out after " + stringify(timeout));
     })
     .then(defer(self(), &Self::_tcpHealthCheck, lambda::_1));
 }
@@ -1041,7 +1052,8 @@ void HealthCheckerProcess::scheduleNext(const Duration& duration)
 {
   CHECK(!paused);
 
-  VLOG(1) << "Scheduling health check in " << duration;
+  VLOG(1) << "Scheduling health check for task '" << taskId << "' in "
+          << duration;
 
   delay(duration, self(), &Self::performSingleCheck);
 }
@@ -1050,7 +1062,7 @@ void HealthCheckerProcess::scheduleNext(const Duration& duration)
 void HealthCheckerProcess::pause()
 {
   if (!paused) {
-    VLOG(1) << "Health checking paused";
+    VLOG(1) << "Health checking for task '" << taskId << "' paused";
 
     paused = true;
   }
@@ -1060,7 +1072,7 @@ void HealthCheckerProcess::pause()
 void HealthCheckerProcess::resume()
 {
   if (paused) {
-    VLOG(1) << "Health checking resumed";
+    VLOG(1) << "Health checking for task '" << taskId << "' resumed";
 
     paused = false;
 
@@ -1081,7 +1093,7 @@ Option<Error> healthCheck(const HealthCheck& check)
   switch (check.type()) {
     case HealthCheck::COMMAND: {
       if (!check.has_command()) {
-        return Error("Expecting 'command' to be set for command health check");
+        return Error("Expecting 'command' to be set for COMMAND health check");
       }
 
       const CommandInfo& command = check.command();

http://git-wip-us.apache.org/repos/asf/mesos/blob/5c9ce378/src/tests/check_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/check_tests.cpp b/src/tests/check_tests.cpp
index 16f1c07..78ac498 100644
--- a/src/tests/check_tests.cpp
+++ b/src/tests/check_tests.cpp
@@ -1028,7 +1028,7 @@ TEST_F(CheckTest, CheckInfoValidation)
     Option<Error> validate = validation::checkInfo(checkInfo);
     EXPECT_SOME(validate);
     EXPECT_EQ(
-        "Expecting 'command' to be set for command check",
+        "Expecting 'command' to be set for COMMAND check",
         validate->message);
 
     checkInfo.set_type(CheckInfo::HTTP);
@@ -1090,7 +1090,7 @@ TEST_F(CheckTest, CheckInfoValidation)
     validate = validation::checkInfo(checkInfo);
     EXPECT_SOME(validate);
     EXPECT_EQ(
-        "The path 'healthz' of HTTP  check must start with '/'",
+        "The path 'healthz' of HTTP check must start with '/'",
         validate->message);
   }
 
@@ -1160,7 +1160,7 @@ TEST_F(CheckTest, CheckStatusInfoValidation)
     Option<Error> validate = validation::checkStatusInfo(checkStatusInfo);
     EXPECT_SOME(validate);
     EXPECT_EQ(
-        "Expecting 'command' to be set for command check's status",
+        "Expecting 'command' to be set for COMMAND check's status",
         validate->message);
 
     checkStatusInfo.set_type(CheckInfo::HTTP);