You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2016/08/10 21:54:43 UTC
[2/2] mesos git commit: Moved the implementation of health checker to
the .cpp file.
Moved the implementation of health checker to the .cpp file.
Review: https://reviews.apache.org/r/50922/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d5f3b0d1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d5f3b0d1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d5f3b0d1
Branch: refs/heads/master
Commit: d5f3b0d17cf1a485b37a6ee1537a7baff05992d9
Parents: f511a24
Author: haosdent huang <ha...@gmail.com>
Authored: Wed Aug 10 22:40:41 2016 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Aug 10 23:53:28 2016 +0200
----------------------------------------------------------------------
src/health-check/health_checker.cpp | 225 ++++++++++++++++++++++++++++--
src/health-check/health_checker.hpp | 227 ++-----------------------------
2 files changed, 225 insertions(+), 227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/d5f3b0d1/src/health-check/health_checker.cpp
----------------------------------------------------------------------
diff --git a/src/health-check/health_checker.cpp b/src/health-check/health_checker.cpp
index 585a0b5..052c7c5 100644
--- a/src/health-check/health_checker.cpp
+++ b/src/health-check/health_checker.cpp
@@ -14,6 +14,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+#include "health-check/health_checker.hpp"
+
#include <signal.h>
#include <stdio.h>
#include <string.h>
@@ -27,24 +29,32 @@
#include <mesos/mesos.hpp>
-#include <process/future.hpp>
-#include <process/pid.hpp>
-#include <process/process.hpp>
-#include <process/protobuf.hpp>
+#include <process/delay.hpp>
+#include <process/owned.hpp>
+#include <process/subprocess.hpp>
+#include <stout/os.hpp>
#include <stout/protobuf.hpp>
-#include "health-check/health_checker.hpp"
-
-using namespace mesos;
+#include "common/status_utils.hpp"
+using process::delay;
+using process::Clock;
+using process::Future;
+using process::NO_SETSID;
+using process::Owned;
+using process::Promise;
+using process::Subprocess;
+using process::Time;
using process::UPID;
+using std::map;
+using std::string;
+using std::vector;
+
namespace mesos {
namespace internal {
-using namespace process;
-
Try<Owned<HealthChecker>> HealthChecker::create(
const HealthCheck& check,
const UPID& executor,
@@ -88,5 +98,202 @@ Future<Nothing> HealthChecker::healthCheck()
return dispatch(process.get(), &HealthCheckerProcess::healthCheck);
}
+
+HealthCheckerProcess::HealthCheckerProcess(
+ const HealthCheck& _check,
+ const UPID& _executor,
+ const TaskID& _taskID)
+ : ProcessBase(process::ID::generate("health-checker")),
+ check(_check),
+ initializing(true),
+ executor(_executor),
+ taskID(_taskID),
+ consecutiveFailures(0) {}
+
+
+Future<Nothing> HealthCheckerProcess::healthCheck()
+{
+ VLOG(2) << "Health checks starting in "
+ << Seconds(check.delay_seconds()) << ", grace period "
+ << Seconds(check.grace_period_seconds());
+
+ startTime = Clock::now();
+
+ delay(Seconds(check.delay_seconds()), self(), &Self::_healthCheck);
+ return promise.future();
+}
+
+
+void HealthCheckerProcess::failure(const string& message)
+{
+ if (check.grace_period_seconds() > 0 &&
+ (Clock::now() - startTime).secs() <= check.grace_period_seconds()) {
+ LOG(INFO) << "Ignoring failure as health check still in grace period";
+ reschedule();
+ return;
+ }
+
+ consecutiveFailures++;
+ VLOG(1) << "#" << consecutiveFailures << " check failed: " << message;
+
+ bool killTask = consecutiveFailures >= check.consecutive_failures();
+
+ TaskHealthStatus taskHealthStatus;
+ taskHealthStatus.set_healthy(false);
+ taskHealthStatus.set_consecutive_failures(consecutiveFailures);
+ taskHealthStatus.set_kill_task(killTask);
+ taskHealthStatus.mutable_task_id()->CopyFrom(taskID);
+ send(executor, taskHealthStatus);
+
+ if (killTask) {
+ // This is a hack to ensure the message is sent to the
+ // executor before we exit the process. Without this,
+ // we may exit before libprocess has sent the data over
+ // the socket. See MESOS-4111.
+ os::sleep(Seconds(1));
+ promise.fail(message);
+ } else {
+ reschedule();
+ }
+}
+
+
+void HealthCheckerProcess::success()
+{
+ VLOG(1) << "Check passed";
+
+ // Send a healthy status update on the first success,
+ // and on the first success following failure(s).
+ if (initializing || consecutiveFailures > 0) {
+ TaskHealthStatus taskHealthStatus;
+ taskHealthStatus.set_healthy(true);
+ taskHealthStatus.mutable_task_id()->CopyFrom(taskID);
+ send(executor, taskHealthStatus);
+ initializing = false;
+ }
+
+ consecutiveFailures = 0;
+ reschedule();
+}
+
+
+void HealthCheckerProcess::_healthCheck()
+{
+ if (check.has_http()) {
+ promise.fail("HTTP health check is not supported");
+ return;
+ }
+
+ if (!check.has_command()) {
+ promise.fail("No check found in health check");
+ return;
+ }
+
+ const CommandInfo& command = check.command();
+
+ map<string, string> environment = os::environment();
+
+ foreach (const Environment::Variable& variable,
+ command.environment().variables()) {
+ environment[variable.name()] = variable.value();
+ }
+
+ // Launch the subprocess.
+ Option<Try<Subprocess>> external = None();
+
+ if (command.shell()) {
+ // Use the shell variant.
+ if (!command.has_value()) {
+ promise.fail("Shell command is not specified");
+ return;
+ }
+
+ VLOG(2) << "Launching health command '" << command.value() << "'";
+
+ external = subprocess(
+ command.value(),
+ Subprocess::PATH("/dev/null"),
+ Subprocess::FD(STDERR_FILENO),
+ Subprocess::FD(STDERR_FILENO),
+ NO_SETSID,
+ environment);
+ } else {
+ // Use the exec variant.
+ if (!command.has_value()) {
+ promise.fail("Executable path is not specified");
+ return;
+ }
+
+ vector<string> argv;
+ foreach (const string& arg, command.arguments()) {
+ argv.push_back(arg);
+ }
+
+ VLOG(2) << "Launching health command [" << command.value() << ", "
+ << strings::join(", ", argv) << "]";
+
+ external = subprocess(
+ command.value(),
+ argv,
+ Subprocess::PATH("/dev/null"),
+ Subprocess::FD(STDERR_FILENO),
+ Subprocess::FD(STDERR_FILENO),
+ NO_SETSID,
+ None(),
+ environment);
+ }
+
+ CHECK_SOME(external);
+
+ if (external.get().isError()) {
+ failure("Error creating subprocess for healthcheck: " +
+ external.get().error());
+ return;
+ }
+
+ pid_t commandPid = external.get().get().pid();
+
+ Future<Option<int>> status = external.get().get().status();
+ status.await(Seconds(check.timeout_seconds()));
+
+ if (!status.isReady()) {
+ string msg = "Command check failed with reason: ";
+ if (status.isFailed()) {
+ msg += "failed with error: " + status.failure();
+ } else if (status.isDiscarded()) {
+ msg += "status future discarded";
+ } else {
+ msg += "status still pending after timeout " +
+ stringify(Seconds(check.timeout_seconds()));
+ }
+
+ if (commandPid != -1) {
+ // Cleanup the external command process.
+ os::killtree(commandPid, SIGKILL);
+ VLOG(1) << "Kill health check command " << commandPid;
+ }
+
+ failure(msg);
+ return;
+ }
+
+ int statusCode = status.get().get();
+ if (statusCode != 0) {
+ string message = "Health command check " + WSTRINGIFY(statusCode);
+ failure(message);
+ } else {
+ success();
+ }
+}
+
+
+void HealthCheckerProcess::reschedule()
+{
+ VLOG(1) << "Rescheduling health check in "
+ << Seconds(check.interval_seconds());
+
+ delay(Seconds(check.interval_seconds()), self(), &Self::_healthCheck);
+}
+
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/d5f3b0d1/src/health-check/health_checker.hpp
----------------------------------------------------------------------
diff --git a/src/health-check/health_checker.hpp b/src/health-check/health_checker.hpp
index b28a9cf..0cdc2de 100644
--- a/src/health-check/health_checker.hpp
+++ b/src/health-check/health_checker.hpp
@@ -17,48 +17,15 @@
#ifndef __HEALTH_CHECKER_HPP__
#define __HEALTH_CHECKER_HPP__
-#include <signal.h>
-#include <stdio.h>
-#include <string.h>
-#ifndef __WINDOWS__
-#include <unistd.h>
-#endif // __WINDOWS__
-
-#include <iostream>
#include <string>
-#include <mesos/mesos.hpp>
-
-#include <process/delay.hpp>
#include <process/future.hpp>
-#include <process/id.hpp>
#include <process/pid.hpp>
#include <process/process.hpp>
#include <process/protobuf.hpp>
-#include <process/subprocess.hpp>
-
-#include <stout/duration.hpp>
-#include <stout/flags.hpp>
-#include <stout/json.hpp>
-#include <stout/option.hpp>
-#include <stout/os.hpp>
-#include <stout/protobuf.hpp>
-#include <stout/strings.hpp>
-
-#include <stout/os/killtree.hpp>
-
-#include "common/status_utils.hpp"
#include "messages/messages.hpp"
-using std::cout;
-using std::cerr;
-using std::endl;
-using std::map;
-using std::string;
-
-using process::UPID;
-
namespace mesos {
namespace internal {
@@ -70,7 +37,7 @@ class HealthChecker
public:
static Try<process::Owned<HealthChecker>> create(
const HealthCheck& check,
- const UPID& executor,
+ const process::UPID& executor,
const TaskID& taskID);
~HealthChecker();
@@ -89,202 +56,26 @@ class HealthCheckerProcess : public ProtobufProcess<HealthCheckerProcess>
public:
HealthCheckerProcess(
const HealthCheck& _check,
- const UPID& _executor,
- const TaskID& _taskID)
- : ProcessBase(process::ID::generate("health-checker")),
- check(_check),
- initializing(true),
- executor(_executor),
- taskID(_taskID),
- consecutiveFailures(0) {}
+ const process::UPID& _executor,
+ const TaskID& _taskID);
virtual ~HealthCheckerProcess() {}
- process::Future<Nothing> healthCheck()
- {
- VLOG(2) << "Health checks starting in "
- << Seconds(check.delay_seconds()) << ", grace period "
- << Seconds(check.grace_period_seconds());
-
- startTime = process::Clock::now();
-
- delay(Seconds(check.delay_seconds()), self(), &Self::_healthCheck);
- return promise.future();
- }
+ process::Future<Nothing> healthCheck();
private:
- void failure(const string& message)
- {
- if (check.grace_period_seconds() > 0 &&
- (process::Clock::now() - startTime).secs() <=
- check.grace_period_seconds()) {
- LOG(INFO) << "Ignoring failure as health check still in grace period";
- reschedule();
- return;
- }
-
- consecutiveFailures++;
- VLOG(1) << "#" << consecutiveFailures << " check failed: " << message;
-
- bool killTask = consecutiveFailures >= check.consecutive_failures();
-
- TaskHealthStatus taskHealthStatus;
- taskHealthStatus.set_healthy(false);
- taskHealthStatus.set_consecutive_failures(consecutiveFailures);
- taskHealthStatus.set_kill_task(killTask);
- taskHealthStatus.mutable_task_id()->CopyFrom(taskID);
- send(executor, taskHealthStatus);
-
- if (killTask) {
- // This is a hack to ensure the message is sent to the
- // executor before we exit the process. Without this,
- // we may exit before libprocess has sent the data over
- // the socket. See MESOS-4111.
- os::sleep(Seconds(1));
- promise.fail(message);
- } else {
- reschedule();
- }
- }
-
- void success()
- {
- VLOG(1) << "Check passed";
-
- // Send a healthy status update on the first success,
- // and on the first success following failure(s).
- if (initializing || consecutiveFailures > 0) {
- TaskHealthStatus taskHealthStatus;
- taskHealthStatus.set_healthy(true);
- taskHealthStatus.mutable_task_id()->CopyFrom(taskID);
- send(executor, taskHealthStatus);
- initializing = false;
- }
- consecutiveFailures = 0;
- reschedule();
- }
-
- void _healthCheck()
- {
- if (check.has_http()) {
- promise.fail("HTTP health check is not supported");
- return;
- }
-
- if (!check.has_command()) {
- promise.fail("No check found in health check");
- return;
- }
-
- const CommandInfo& command = check.command();
-
- map<string, string> environment = os::environment();
-
- foreach (const Environment::Variable& variable,
- command.environment().variables()) {
- environment[variable.name()] = variable.value();
- }
-
- // Launch the subprocess.
- Option<Try<process::Subprocess>> external = None();
-
- if (command.shell()) {
- // Use the shell variant.
- if (!command.has_value()) {
- promise.fail("Shell command is not specified");
- return;
- }
-
- VLOG(2) << "Launching health command '" << command.value() << "'";
-
- external = process::subprocess(
- command.value(),
- process::Subprocess::PATH("/dev/null"),
- process::Subprocess::FD(STDERR_FILENO),
- process::Subprocess::FD(STDERR_FILENO),
- process::NO_SETSID,
- environment);
- } else {
- // Use the exec variant.
- if (!command.has_value()) {
- promise.fail("Executable path is not specified");
- return;
- }
-
- vector<string> argv;
- foreach (const string& arg, command.arguments()) {
- argv.push_back(arg);
- }
-
- VLOG(2) << "Launching health command [" << command.value() << ", "
- << strings::join(", ", argv) << "]";
-
- external = process::subprocess(
- command.value(),
- argv,
- process::Subprocess::PATH("/dev/null"),
- process::Subprocess::FD(STDERR_FILENO),
- process::Subprocess::FD(STDERR_FILENO),
- process::NO_SETSID,
- None(),
- environment);
- }
-
- CHECK_SOME(external);
-
- if (external.get().isError()) {
- failure("Error creating subprocess for healthcheck: " +
- external.get().error());
- return;
- }
-
- pid_t commandPid = external.get().get().pid();
-
- process::Future<Option<int>> status = external.get().get().status();
- status.await(Seconds(check.timeout_seconds()));
-
- if (!status.isReady()) {
- string msg = "Command check failed with reason: ";
- if (status.isFailed()) {
- msg += "failed with error: " + status.failure();
- } else if (status.isDiscarded()) {
- msg += "status future discarded";
- } else {
- msg += "status still pending after timeout " +
- stringify(Seconds(check.timeout_seconds()));
- }
-
- if (commandPid != -1) {
- // Cleanup the external command process.
- os::killtree(commandPid, SIGKILL);
- VLOG(1) << "Kill health check command " << commandPid;
- }
-
- failure(msg);
- return;
- }
+ void failure(const std::string& message);
- int statusCode = status.get().get();
- if (statusCode != 0) {
- string message = "Health command check " + WSTRINGIFY(statusCode);
- failure(message);
- } else {
- success();
- }
- }
+ void success();
- void reschedule()
- {
- VLOG(1) << "Rescheduling health check in "
- << Seconds(check.interval_seconds());
+ void _healthCheck();
- delay(Seconds(check.interval_seconds()), self(), &Self::_healthCheck);
- }
+ void reschedule();
process::Promise<Nothing> promise;
HealthCheck check;
bool initializing;
- UPID executor;
+ process::UPID executor;
TaskID taskID;
uint32_t consecutiveFailures;
process::Time startTime;