You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2016/08/28 15:04:05 UTC

[05/10] mesos git commit: Refactored `_commandHealthCheck` in HealthChecker.

Refactored `_commandHealthCheck` in HealthChecker.

* Remove blocking `Future::await` call.
* Adjust the level of some logs.
* Adjust style.
* Change the interface of health check handlers to
  `Future<Nothing>` to make errors handling more easier.

Review: https://reviews.apache.org/r/51069/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7380d130
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7380d130
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7380d130

Branch: refs/heads/master
Commit: 7380d130a4b03fd110acbcb0636f8615b8f28cee
Parents: f19a6aa
Author: haosdent huang <ha...@gmail.com>
Authored: Fri Aug 26 16:33:12 2016 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Sun Aug 28 16:42:41 2016 +0200

----------------------------------------------------------------------
 src/health-check/health_checker.cpp | 123 ++++++++++++++++++-------------
 src/health-check/health_checker.hpp |  10 ++-
 2 files changed, 78 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/7380d130/src/health-check/health_checker.cpp
----------------------------------------------------------------------
diff --git a/src/health-check/health_checker.cpp b/src/health-check/health_checker.cpp
index 1a2a6df..097117a 100644
--- a/src/health-check/health_checker.cpp
+++ b/src/health-check/health_checker.cpp
@@ -29,6 +29,7 @@
 
 #include <mesos/mesos.hpp>
 
+#include <process/collect.hpp>
 #include <process/delay.hpp>
 #include <process/subprocess.hpp>
 
@@ -45,6 +46,7 @@
 
 using process::delay;
 using process::Clock;
+using process::Failure;
 using process::Future;
 using process::NO_SETSID;
 using process::Owned;
@@ -116,9 +118,9 @@ HealthCheckerProcess::HealthCheckerProcess(
 
 Future<Nothing> HealthCheckerProcess::healthCheck()
 {
-  VLOG(2) << "Health checks starting in "
-    << Seconds(check.delay_seconds()) << ", grace period "
-    << Seconds(check.grace_period_seconds());
+  VLOG(1) << "Health check starting in "
+          << Seconds(check.delay_seconds()) << ", grace period "
+          << Seconds(check.grace_period_seconds());
 
   startTime = Clock::now();
 
@@ -137,7 +139,8 @@ void HealthCheckerProcess::failure(const string& message)
   }
 
   consecutiveFailures++;
-  VLOG(1) << "#" << consecutiveFailures << " check failed: " << message;
+  LOG(WARNING) << "Health check failed " << consecutiveFailures
+               << " times consecutively: " << message;
 
   bool killTask = consecutiveFailures >= check.consecutive_failures();
 
@@ -163,7 +166,7 @@ void HealthCheckerProcess::failure(const string& message)
 
 void HealthCheckerProcess::success()
 {
-  VLOG(1) << "Check passed";
+  VLOG(1) << HealthCheck::Type_Name(check.type()) << " health check passed";
 
   // Send a healthy status update on the first success,
   // and on the first success following failure(s).
@@ -182,30 +185,49 @@ void HealthCheckerProcess::success()
 
 void HealthCheckerProcess::_healthCheck()
 {
+  Future<Nothing> checkResult;
+
   switch (check.type()) {
     case HealthCheck::COMMAND: {
-      _commandHealthCheck();
-      return;
+      checkResult = _commandHealthCheck();
+      break;
     }
 
     case HealthCheck::HTTP: {
-      _httpHealthCheck();
-      return;
+      checkResult = _httpHealthCheck();
+      break;
     }
 
     case HealthCheck::TCP: {
-      _tcpHealthCheck();
-      return;
+      checkResult = _tcpHealthCheck();
+      break;
     }
 
     default: {
       UNREACHABLE();
     }
   }
+
+  checkResult.onAny(defer(self(), &Self::__healthCheck, lambda::_1));
+}
+
+
+void HealthCheckerProcess::__healthCheck(const Future<Nothing>& future)
+{
+  if (future.isReady()) {
+    success();
+    return;
+  }
+
+  string message = HealthCheck::Type_Name(check.type()) +
+                   " health check failed: " +
+                   (future.isFailed() ? future.failure() : "discarded");
+
+  failure(message);
 }
 
 
-void HealthCheckerProcess::_commandHealthCheck()
+Future<Nothing> HealthCheckerProcess::_commandHealthCheck()
 {
   CHECK_EQ(HealthCheck::COMMAND, check.type());
   CHECK(check.has_command());
@@ -220,11 +242,11 @@ void HealthCheckerProcess::_commandHealthCheck()
   }
 
   // Launch the subprocess.
-  Option<Try<Subprocess>> external = None();
+  Try<Subprocess> external = Error("Not launched");
 
   if (command.shell()) {
     // Use the shell variant.
-    VLOG(2) << "Launching health command '" << command.value() << "'";
+    VLOG(1) << "Launching command health check '" << command.value() << "'";
 
     external = subprocess(
         command.value(),
@@ -240,7 +262,7 @@ void HealthCheckerProcess::_commandHealthCheck()
       argv.push_back(arg);
     }
 
-    VLOG(2) << "Launching health command [" << command.value() << ", "
+    VLOG(1) << "Launching command health check [" << command.value() << ", "
             << strings::join(", ", argv) << "]";
 
     external = subprocess(
@@ -254,72 +276,69 @@ void HealthCheckerProcess::_commandHealthCheck()
         environment);
   }
 
-  CHECK_SOME(external);
-
-  if (external.get().isError()) {
-    failure("Error creating subprocess for healthcheck: " +
-            external.get().error());
-    return;
+  if (external.isError()) {
+    return Failure("Failed to create subprocess: " + external.error());
   }
 
-  pid_t commandPid = external.get().get().pid();
+  pid_t commandPid = external->pid();
+  Duration timeout = Seconds(check.timeout_seconds());
 
-  Future<Option<int>> status = external.get().get().status();
-  status.await(Seconds(check.timeout_seconds()));
+  return external->status()
+    .after(timeout, [timeout, commandPid](Future<Option<int>> future) {
+      future.discard();
 
-  if (!status.isReady()) {
-    string msg = "Command check failed with reason: ";
-    if (status.isFailed()) {
-      msg += "failed with error: " + status.failure();
-    } else if (status.isDiscarded()) {
-      msg += "status future discarded";
-    } else {
-      msg += "status still pending after timeout " +
-             stringify(Seconds(check.timeout_seconds()));
-    }
+      if (commandPid != -1) {
+        // Cleanup the external command process.
+        VLOG(1) << "Killing the command health check process " << commandPid;
 
-    if (commandPid != -1) {
-      // Cleanup the external command process.
-      os::killtree(commandPid, SIGKILL);
-      VLOG(1) << "Kill health check command " << commandPid;
-    }
+        os::killtree(commandPid, SIGKILL);
+      }
 
-    failure(msg);
-    return;
-  }
+      return Failure(
+          "Command has not returned after " + stringify(timeout) +
+          "; aborting");
+    })
+    .then([](const Option<int>& status) -> Future<Nothing> {
+      if (status.isNone()) {
+        return Failure("Failed to reap the command process");
+      }
 
-  int statusCode = status.get().get();
-  if (statusCode != 0) {
-    string message = "Health command check " + WSTRINGIFY(statusCode);
-    failure(message);
-  } else {
-    success();
-  }
+      int statusCode = status.get();
+      if (statusCode != 0) {
+        return Failure("Command returned " + WSTRINGIFY(statusCode));
+      }
+
+      return Nothing();
+    });
 }
 
 
-void HealthCheckerProcess::_httpHealthCheck()
+Future<Nothing> HealthCheckerProcess::_httpHealthCheck()
 {
   CHECK_EQ(HealthCheck::HTTP, check.type());
   CHECK(check.has_http());
 
   promise.fail("HTTP health check is not supported");
+
+  return Nothing();
 }
 
 
-void HealthCheckerProcess::_tcpHealthCheck()
+Future<Nothing> HealthCheckerProcess::_tcpHealthCheck()
 {
   CHECK_EQ(HealthCheck::TCP, check.type());
   CHECK(check.has_tcp());
 
   promise.fail("TCP health check is not supported");
+
+  return Nothing();
 }
 
 
 void HealthCheckerProcess::reschedule()
 {
   VLOG(1) << "Rescheduling health check in "
-    << Seconds(check.interval_seconds());
+          << Seconds(check.interval_seconds());
 
   delay(Seconds(check.interval_seconds()), self(), &Self::_healthCheck);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/7380d130/src/health-check/health_checker.hpp
----------------------------------------------------------------------
diff --git a/src/health-check/health_checker.hpp b/src/health-check/health_checker.hpp
index b4548f3..83cedfb 100644
--- a/src/health-check/health_checker.hpp
+++ b/src/health-check/health_checker.hpp
@@ -76,9 +76,13 @@ private:
 
   void _healthCheck();
 
-  void _commandHealthCheck();
-  void _httpHealthCheck();
-  void _tcpHealthCheck();
+  void __healthCheck(const process::Future<Nothing>& future);
+
+  process::Future<Nothing> _commandHealthCheck();
+
+  process::Future<Nothing> _httpHealthCheck();
+
+  process::Future<Nothing> _tcpHealthCheck();
 
   void reschedule();