You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2016/08/28 15:04:05 UTC
[05/10] mesos git commit: Refactored `_commandHealthCheck` in
HealthChecker.
Refactored `_commandHealthCheck` in HealthChecker.
* Remove blocking `Future::await` call.
* Adjust the level of some logs.
* Adjust style.
* Change the interface of health check handlers to
`Future<Nothing>` to make errors handling more easier.
Review: https://reviews.apache.org/r/51069/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/7380d130
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/7380d130
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/7380d130
Branch: refs/heads/master
Commit: 7380d130a4b03fd110acbcb0636f8615b8f28cee
Parents: f19a6aa
Author: haosdent huang <ha...@gmail.com>
Authored: Fri Aug 26 16:33:12 2016 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Sun Aug 28 16:42:41 2016 +0200
----------------------------------------------------------------------
src/health-check/health_checker.cpp | 123 ++++++++++++++++++-------------
src/health-check/health_checker.hpp | 10 ++-
2 files changed, 78 insertions(+), 55 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/7380d130/src/health-check/health_checker.cpp
----------------------------------------------------------------------
diff --git a/src/health-check/health_checker.cpp b/src/health-check/health_checker.cpp
index 1a2a6df..097117a 100644
--- a/src/health-check/health_checker.cpp
+++ b/src/health-check/health_checker.cpp
@@ -29,6 +29,7 @@
#include <mesos/mesos.hpp>
+#include <process/collect.hpp>
#include <process/delay.hpp>
#include <process/subprocess.hpp>
@@ -45,6 +46,7 @@
using process::delay;
using process::Clock;
+using process::Failure;
using process::Future;
using process::NO_SETSID;
using process::Owned;
@@ -116,9 +118,9 @@ HealthCheckerProcess::HealthCheckerProcess(
Future<Nothing> HealthCheckerProcess::healthCheck()
{
- VLOG(2) << "Health checks starting in "
- << Seconds(check.delay_seconds()) << ", grace period "
- << Seconds(check.grace_period_seconds());
+ VLOG(1) << "Health check starting in "
+ << Seconds(check.delay_seconds()) << ", grace period "
+ << Seconds(check.grace_period_seconds());
startTime = Clock::now();
@@ -137,7 +139,8 @@ void HealthCheckerProcess::failure(const string& message)
}
consecutiveFailures++;
- VLOG(1) << "#" << consecutiveFailures << " check failed: " << message;
+ LOG(WARNING) << "Health check failed " << consecutiveFailures
+ << " times consecutively: " << message;
bool killTask = consecutiveFailures >= check.consecutive_failures();
@@ -163,7 +166,7 @@ void HealthCheckerProcess::failure(const string& message)
void HealthCheckerProcess::success()
{
- VLOG(1) << "Check passed";
+ VLOG(1) << HealthCheck::Type_Name(check.type()) << " health check passed";
// Send a healthy status update on the first success,
// and on the first success following failure(s).
@@ -182,30 +185,49 @@ void HealthCheckerProcess::success()
void HealthCheckerProcess::_healthCheck()
{
+ Future<Nothing> checkResult;
+
switch (check.type()) {
case HealthCheck::COMMAND: {
- _commandHealthCheck();
- return;
+ checkResult = _commandHealthCheck();
+ break;
}
case HealthCheck::HTTP: {
- _httpHealthCheck();
- return;
+ checkResult = _httpHealthCheck();
+ break;
}
case HealthCheck::TCP: {
- _tcpHealthCheck();
- return;
+ checkResult = _tcpHealthCheck();
+ break;
}
default: {
UNREACHABLE();
}
}
+
+ checkResult.onAny(defer(self(), &Self::__healthCheck, lambda::_1));
+}
+
+
+void HealthCheckerProcess::__healthCheck(const Future<Nothing>& future)
+{
+ if (future.isReady()) {
+ success();
+ return;
+ }
+
+ string message = HealthCheck::Type_Name(check.type()) +
+ " health check failed: " +
+ (future.isFailed() ? future.failure() : "discarded");
+
+ failure(message);
}
-void HealthCheckerProcess::_commandHealthCheck()
+Future<Nothing> HealthCheckerProcess::_commandHealthCheck()
{
CHECK_EQ(HealthCheck::COMMAND, check.type());
CHECK(check.has_command());
@@ -220,11 +242,11 @@ void HealthCheckerProcess::_commandHealthCheck()
}
// Launch the subprocess.
- Option<Try<Subprocess>> external = None();
+ Try<Subprocess> external = Error("Not launched");
if (command.shell()) {
// Use the shell variant.
- VLOG(2) << "Launching health command '" << command.value() << "'";
+ VLOG(1) << "Launching command health check '" << command.value() << "'";
external = subprocess(
command.value(),
@@ -240,7 +262,7 @@ void HealthCheckerProcess::_commandHealthCheck()
argv.push_back(arg);
}
- VLOG(2) << "Launching health command [" << command.value() << ", "
+ VLOG(1) << "Launching command health check [" << command.value() << ", "
<< strings::join(", ", argv) << "]";
external = subprocess(
@@ -254,72 +276,69 @@ void HealthCheckerProcess::_commandHealthCheck()
environment);
}
- CHECK_SOME(external);
-
- if (external.get().isError()) {
- failure("Error creating subprocess for healthcheck: " +
- external.get().error());
- return;
+ if (external.isError()) {
+ return Failure("Failed to create subprocess: " + external.error());
}
- pid_t commandPid = external.get().get().pid();
+ pid_t commandPid = external->pid();
+ Duration timeout = Seconds(check.timeout_seconds());
- Future<Option<int>> status = external.get().get().status();
- status.await(Seconds(check.timeout_seconds()));
+ return external->status()
+ .after(timeout, [timeout, commandPid](Future<Option<int>> future) {
+ future.discard();
- if (!status.isReady()) {
- string msg = "Command check failed with reason: ";
- if (status.isFailed()) {
- msg += "failed with error: " + status.failure();
- } else if (status.isDiscarded()) {
- msg += "status future discarded";
- } else {
- msg += "status still pending after timeout " +
- stringify(Seconds(check.timeout_seconds()));
- }
+ if (commandPid != -1) {
+ // Cleanup the external command process.
+ VLOG(1) << "Killing the command health check process " << commandPid;
- if (commandPid != -1) {
- // Cleanup the external command process.
- os::killtree(commandPid, SIGKILL);
- VLOG(1) << "Kill health check command " << commandPid;
- }
+ os::killtree(commandPid, SIGKILL);
+ }
- failure(msg);
- return;
- }
+ return Failure(
+ "Command has not returned after " + stringify(timeout) +
+ "; aborting");
+ })
+ .then([](const Option<int>& status) -> Future<Nothing> {
+ if (status.isNone()) {
+ return Failure("Failed to reap the command process");
+ }
- int statusCode = status.get().get();
- if (statusCode != 0) {
- string message = "Health command check " + WSTRINGIFY(statusCode);
- failure(message);
- } else {
- success();
- }
+ int statusCode = status.get();
+ if (statusCode != 0) {
+ return Failure("Command returned " + WSTRINGIFY(statusCode));
+ }
+
+ return Nothing();
+ });
}
-void HealthCheckerProcess::_httpHealthCheck()
+Future<Nothing> HealthCheckerProcess::_httpHealthCheck()
{
CHECK_EQ(HealthCheck::HTTP, check.type());
CHECK(check.has_http());
promise.fail("HTTP health check is not supported");
+
+ return Nothing();
}
-void HealthCheckerProcess::_tcpHealthCheck()
+Future<Nothing> HealthCheckerProcess::_tcpHealthCheck()
{
CHECK_EQ(HealthCheck::TCP, check.type());
CHECK(check.has_tcp());
promise.fail("TCP health check is not supported");
+
+ return Nothing();
}
void HealthCheckerProcess::reschedule()
{
VLOG(1) << "Rescheduling health check in "
- << Seconds(check.interval_seconds());
+ << Seconds(check.interval_seconds());
delay(Seconds(check.interval_seconds()), self(), &Self::_healthCheck);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/7380d130/src/health-check/health_checker.hpp
----------------------------------------------------------------------
diff --git a/src/health-check/health_checker.hpp b/src/health-check/health_checker.hpp
index b4548f3..83cedfb 100644
--- a/src/health-check/health_checker.hpp
+++ b/src/health-check/health_checker.hpp
@@ -76,9 +76,13 @@ private:
void _healthCheck();
- void _commandHealthCheck();
- void _httpHealthCheck();
- void _tcpHealthCheck();
+ void __healthCheck(const process::Future<Nothing>& future);
+
+ process::Future<Nothing> _commandHealthCheck();
+
+ process::Future<Nothing> _httpHealthCheck();
+
+ process::Future<Nothing> _tcpHealthCheck();
void reschedule();