You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/03/24 04:36:11 UTC

[5/6] mesos git commit: Made COMMAND health checks resilient to agent failovers.

Made COMMAND health checks resilient to agent failovers.

Review: https://reviews.apache.org/r/57646/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0c0fbc57
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0c0fbc57
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0c0fbc57

Branch: refs/heads/master
Commit: 0c0fbc57bed2ab26dff516491c6264f37d14cd4f
Parents: 85edc8f
Author: Gast�n Kleiman <ga...@mesosphere.io>
Authored: Fri Mar 24 00:50:04 2017 +0100
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Fri Mar 24 05:29:12 2017 +0100

----------------------------------------------------------------------
 src/checks/health_checker.cpp | 228 +++++++++++++++++++++++--------------
 src/checks/health_checker.hpp |  16 ++-
 2 files changed, 155 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0c0fbc57/src/checks/health_checker.cpp
----------------------------------------------------------------------
diff --git a/src/checks/health_checker.cpp b/src/checks/health_checker.cpp
index 3290eb6..1c098d1 100644
--- a/src/checks/health_checker.cpp
+++ b/src/checks/health_checker.cpp
@@ -34,6 +34,7 @@
 #include <process/collect.hpp>
 #include <process/delay.hpp>
 #include <process/dispatch.hpp>
+#include <process/future.hpp>
 #include <process/http.hpp>
 #include <process/io.hpp>
 #include <process/subprocess.hpp>
@@ -68,6 +69,7 @@ using process::Clock;
 using process::Failure;
 using process::Future;
 using process::Owned;
+using process::Promise;
 using process::Subprocess;
 using process::Time;
 
@@ -75,6 +77,7 @@ using process::http::Connection;
 using process::http::Response;
 
 using std::map;
+using std::shared_ptr;
 using std::string;
 using std::tuple;
 using std::vector;
@@ -377,6 +380,13 @@ void HealthCheckerProcess::processCheckResult(
     return;
   }
 
+  if (future.isDiscarded()) {
+    LOG(INFO) << HealthCheck::Type_Name(check.type()) +
+                 " health check of task " + stringify(taskId) + " discarded";
+    scheduleNext(checkInterval);
+    return;
+  }
+
   VLOG(1) << "Performed " << HealthCheck::Type_Name(check.type())
           << " health check in " << stopwatch.elapsed();
 
@@ -386,8 +396,7 @@ void HealthCheckerProcess::processCheckResult(
   }
 
   string message = HealthCheck::Type_Name(check.type()) +
-                   " health check failed: " +
-                   (future.isFailed() ? future.failure() : "discarded");
+                   " health check failed: " + future.failure();
 
   failure(message);
 }
@@ -489,16 +498,37 @@ Future<Nothing> HealthCheckerProcess::nestedCommandHealthCheck()
 
   VLOG(1) << "Launching command health check of task " << stringify(taskId);
 
-  return process::http::connect(agentURL.get())
-    .repair([](const Future<Connection>& future) {
-      return Failure(
-          "Unable to establish connection with the agent: " + future.failure());
-    })
-    .then(defer(self(), &Self::_nestedCommandHealthCheck, lambda::_1));
+  // We don't want recoverable errors, e.g., the agent responding with
+  // HTTP status code 503, to trigger a health check failure.
+  //
+  // The future returned by this method represents the result of a
+  // health check. It will be set to `Nothing` if the check succeeded,
+  // to a `Failure` if it failed, and discarded if there was a transient
+  // error.
+  auto promise = std::make_shared<Promise<Nothing>>();
+
+  // TODO(alexr): Use lambda a named capture for
+  // this cached value once it is available.
+  const TaskID _taskId = taskId;
+
+  process::http::connect(agentURL.get())
+    .onFailed(defer(self(), [_taskId, promise](const string& failure) {
+      LOG(WARNING) << "Unable to establish connection with the agent to launch"
+                   << " COMMAND health check for task '" << _taskId
+                   << "': " << failure;
+
+      // We treat this as a transient failure.
+      promise->discard();
+    }))
+    .onReady(defer(self(),
+                   &Self::_nestedCommandHealthCheck, promise, lambda::_1));
+
+  return promise->future();
 }
 
 
-Future<Nothing> HealthCheckerProcess::_nestedCommandHealthCheck(
+void HealthCheckerProcess::_nestedCommandHealthCheck(
+    shared_ptr<process::Promise<Nothing>> promise,
     Connection connection)
 {
   // TODO(gkleiman): Don't reuse the `ContainerID`, it is not safe.
@@ -525,6 +555,12 @@ Future<Nothing> HealthCheckerProcess::_nestedCommandHealthCheck(
                      {"Message-Accept", stringify(ContentType::PROTOBUF)},
                      {"Content-Type", stringify(ContentType::PROTOBUF)}};
 
+  // TODO(alexr): Use lambda named captures for
+  // these cached values once they are available.
+  const Duration timeout = checkTimeout;
+
+  auto checkTimedOut = std::make_shared<bool>(false);
+
   // `LAUNCH_NESTED_CONTAINER_SESSION` returns a streamed response with
   // the output of the container. The agent will close the stream once
   // the container has exited, or kill the container if the client
@@ -537,100 +573,123 @@ Future<Nothing> HealthCheckerProcess::_nestedCommandHealthCheck(
   // This means that this future will not be completed until after the
   // health check command has finished or the connection has been
   // closed.
-  return connection.send(request, false)
+  connection.send(request, false)
     .after(checkTimeout,
-           defer(self(),
-                 &Self::nestedCommandHealthCheckTimedOut,
-                 checkContainerId,
-                 connection,
-                 lambda::_1))
-    .then(defer(self(),
-                &Self::__nestedCommandHealthCheck,
-                checkContainerId,
-                lambda::_1));
+           defer(self(), [timeout, checkTimedOut](Future<Response> future) {
+      future.discard();
+
+      *checkTimedOut = true;
+
+      return Failure(
+          "Command timed out after " + stringify(timeout) + "; aborting");
+    }))
+    .onFailed(defer(self(),
+                    &Self::nestedCommandHealthCheckFailure,
+                    promise,
+                    connection,
+                    checkContainerId,
+                    checkTimedOut,
+                    lambda::_1))
+    .onReady(defer(self(),
+                   &Self::__nestedCommandHealthCheck,
+                   promise,
+                   checkContainerId,
+                   lambda::_1));
 }
 
 
-Future<Nothing> HealthCheckerProcess::__nestedCommandHealthCheck(
+void HealthCheckerProcess::__nestedCommandHealthCheck(
+    shared_ptr<process::Promise<Nothing>> promise,
     const ContainerID& checkContainerId,
     const Response& launchResponse)
 {
   if (launchResponse.code != process::http::Status::OK) {
-    return Failure(
-        "Received '" + launchResponse.status + "' (" + launchResponse.body +
-        ") while launching a command health check of task '" +
-        stringify(taskId) + "'");
+    // The agent was unable to launch the health check container, we
+    // treat this as a transient failure.
+    LOG(WARNING) << "Received '" << launchResponse.status << "' ("
+                 << launchResponse.body << ") while launching command health "
+                 << "check of task " << stringify(taskId);
+
+    promise->discard();
+    return;
   }
 
   // We need to make a copy so that the lambdas can capture it.
-  const TaskID taskId_ = taskId;
+  const TaskID _taskId = taskId;
 
-  return waitNestedContainer(checkContainerId)
-    .repair([taskId_](const Future<Option<int>>& future) {
-      return Failure(
-          "Unable to get the exit code of command health check of task '" +
-          stringify(taskId_) + "': " + future.failure());
+  waitNestedContainer(checkContainerId)
+    .onFailed([_taskId, promise](const string& failure) {
+      promise->fail(
+          "Unable to get the exit code of command health check of task " +
+          stringify(_taskId) + ": " + failure);
     })
-    .then([taskId_](const Option<int> status) -> Future<Nothing> {
+    .onReady([_taskId, promise](const Option<int>& status) -> void {
       if (status.isNone()) {
-        return Failure(
-            "Unable to get the exit code of command health check of task '" +
-            stringify(taskId_) + "'");
+        promise->fail(
+            "Unable to get the exit code of command health check of task " +
+            stringify(_taskId));
+      // TODO(gkleiman): Make sure that the following block works on Windows.
+      } else if (WIFSIGNALED(status.get()) &&
+                 WTERMSIG(status.get()) == SIGKILL) {
+        // The check container was signaled, probably because the task
+        // finished while the check was still in-flight, so we discard
+        // the result.
+        promise->discard();
       } else if (status.get() != 0) {
-        return Failure(
-            "Command health check of task '" + stringify(taskId_) +
-            "' returned " + WSTRINGIFY(status.get()));
+        promise->fail(
+            "Command health check of task " + stringify(_taskId) +
+            " returned " + WSTRINGIFY(status.get()));
       } else {
-        return Nothing();
+        promise->set(Nothing());
       }
     });
 }
 
 
-Future<Response>
-HealthCheckerProcess::nestedCommandHealthCheckTimedOut(
-    const ContainerID& checkContainerId,
+void HealthCheckerProcess::nestedCommandHealthCheckFailure(
+    shared_ptr<Promise<Nothing>> promise,
     Connection connection,
-    Future<Response> future)
+    ContainerID checkContainerId,
+    shared_ptr<bool> checkTimedOut,
+    const string& failure)
 {
-  future.discard();
-
-  // Closing the connection will make the agent kill the container.
-  connection.disconnect();
-
-  const Failure failure = Failure(
-      "Command health check of task '" + stringify(taskId) +
-      "' has timed out after " + stringify(checkTimeout));
-
-  // We need to make a copy so that the lambda can capture it.
-  const TaskID taskId_ = taskId;
-
-  // If the health check delay interval is zero, we'll try to perform
-  // another health check right after we finish processing the current
-  // timeout.
-  //
-  // All the containers created for the health checks reuse the same
-  // `ContainerID`. In order to prevent conflicts, the future returned
-  // by this method should only be completed once we're sure that the
-  // container has been cleaned up.
-  return waitNestedContainer(checkContainerId)
-    .repair([failure, taskId_](const Future<Option<int>>& waitFuture) {
-      // We assume that once `WaitNestedContainer` returns, irrespective of
-      // whether the response contains a failure, the container will be in a
-      // terminal state, so starting a new health check will not lead to a
-      // transient failure.
-      //
-      // This means that we don't need to retry the `WaitNestedContainer`
-      // call.
-      LOG(WARNING) << "Unable to get the exit code of command health check of "
-                   << "task '" << stringify(taskId_)
-                   << "': " << waitFuture.failure();
-
-      return Future<Option<int>>(failure);
-    })
-    .then([failure](const Option<int>&) {
-      return Future<Response>(failure);
-    });
+  if (*checkTimedOut) {
+    // The health check timed out, closing the connection will make the
+    // agent kill the container.
+    connection.disconnect();
+
+    // If the health check delay interval is zero, we'll try to perform
+    // another health check right after we finish processing the current
+    // timeout.
+    //
+    // All the containers created for the health checks reuse the same
+    // `ContainerID`. In order to prevent conflicts, the promise should
+    // be completed once we're sure that the container has been cleaned
+    // up.
+    waitNestedContainer(checkContainerId)
+      .onAny([failure, promise](const Future<Option<int>>&) {
+        // We assume that once `WaitNestedContainer` returns, irrespective of
+        // whether the response contains a failure, the container will be in a
+        // terminal state, so starting a new health check will not lead to a
+        // transient failure.
+        //
+        // This means that we don't need to retry the `WaitNestedContainer`
+        // call.
+        promise->fail(failure);
+      });
+  } else {
+    // The agent was not able to complete the request, discarding the
+    // promise signals the health checker that it should retry the
+    // health check.
+    //
+    // This will allow us to recover from a blip. The executor will
+    // pause the health checker when it detects that the agent is not
+    // available.
+    LOG(WARNING) << "Connection to the agent to launch COMMAND health check"
+                 << " for task '" << taskId << "' failed: " << failure;
+
+    promise->discard();
+  }
 }
 
 
@@ -653,10 +712,13 @@ Future<Option<int>> HealthCheckerProcess::waitNestedContainer(
                      {"Content-Type", stringify(ContentType::PROTOBUF)}};
 
   return process::http::request(request, false)
+    .repair([this](const Future<Response>& future) {
+      return Failure(
+          "Connection to wait for a health check of task " +
+          stringify(taskId) + " failed: " + future.failure());
+    })
     .then(defer(self(),
-                &Self::_waitNestedContainer,
-                containerId,
-                lambda::_1));
+                &Self::_waitNestedContainer, containerId, lambda::_1));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/0c0fbc57/src/checks/health_checker.hpp
----------------------------------------------------------------------
diff --git a/src/checks/health_checker.hpp b/src/checks/health_checker.hpp
index 29df49b..3d17ea8 100644
--- a/src/checks/health_checker.hpp
+++ b/src/checks/health_checker.hpp
@@ -17,6 +17,7 @@
 #ifndef __HEALTH_CHECKER_HPP__
 #define __HEALTH_CHECKER_HPP__
 
+#include <memory>
 #include <string>
 #include <tuple>
 #include <vector>
@@ -156,18 +157,21 @@ private:
 
   process::Future<Nothing> nestedCommandHealthCheck();
 
-  process::Future<Nothing> _nestedCommandHealthCheck(
+  void _nestedCommandHealthCheck(
+      std::shared_ptr<process::Promise<Nothing>> promise,
       process::http::Connection connection);
 
-  process::Future<Nothing> __nestedCommandHealthCheck(
+  void __nestedCommandHealthCheck(
+      std::shared_ptr<process::Promise<Nothing>> promise,
       const ContainerID& checkContainerId,
       const process::http::Response& launchResponse);
 
-  process::Future<process::http::Response>
-  nestedCommandHealthCheckTimedOut(
-      const ContainerID& checkContainerId,
+  void nestedCommandHealthCheckFailure(
+      std::shared_ptr<process::Promise<Nothing>> promise,
       process::http::Connection connection,
-      process::Future<process::http::Response> future);
+      ContainerID checkContainerId,
+      std::shared_ptr<bool> checkTimedOut,
+      const std::string& failure);
 
   /**
    * Waits for a container to be terminated.