You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2016/11/25 15:19:48 UTC
[09/11] mesos git commit: Used callback instead of `send()` for
health status updates.
Used callback instead of `send()` for health status updates.
Since HealthChecker is now used as a library only and does not live
in a separate OS process, there is no need to use libprocess message
sending for health status updates; a callback will do.
Review: https://reviews.apache.org/r/52872/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/14b5e94d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/14b5e94d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/14b5e94d
Branch: refs/heads/master
Commit: 14b5e94dbecde5f2e72ce64d7c8e0384e746ef3d
Parents: 46e348d
Author: Alexander Rukletsov <ru...@gmail.com>
Authored: Fri Nov 25 16:14:29 2016 +0100
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Fri Nov 25 16:14:29 2016 +0100
----------------------------------------------------------------------
src/docker/executor.cpp | 27 ++++++++-------------------
src/health-check/health_checker.cpp | 13 ++++++-------
src/health-check/health_checker.hpp | 14 +++++++++-----
src/launcher/default_executor.cpp | 29 +++++++++++------------------
src/launcher/executor.cpp | 21 ++++++---------------
5 files changed, 40 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/14b5e94d/src/docker/executor.cpp
----------------------------------------------------------------------
diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp
index e9d65ef..94e116f 100644
--- a/src/docker/executor.cpp
+++ b/src/docker/executor.cpp
@@ -32,6 +32,7 @@
#include <stout/error.hpp>
#include <stout/flags.hpp>
#include <stout/json.hpp>
+#include <stout/lambda.hpp>
#include <stout/os.hpp>
#include <stout/protobuf.hpp>
#include <stout/try.hpp>
@@ -277,19 +278,7 @@ public:
void error(ExecutorDriver* driver, const string& message) {}
protected:
- virtual void initialize()
- {
- install<TaskHealthStatus>(
- &Self::taskHealthUpdated,
- &TaskHealthStatus::task_id,
- &TaskHealthStatus::healthy,
- &TaskHealthStatus::kill_task);
- }
-
- void taskHealthUpdated(
- const TaskID& taskID,
- const bool& healthy,
- const bool& initiateTaskKill)
+ void taskHealthUpdated(const TaskHealthStatus& healthStatus)
{
if (driver.isNone()) {
return;
@@ -302,11 +291,11 @@ protected:
}
cout << "Received task health update, healthy: "
- << stringify(healthy) << endl;
+ << stringify(healthStatus.healthy()) << endl;
TaskStatus status;
- status.mutable_task_id()->CopyFrom(taskID);
- status.set_healthy(healthy);
+ status.mutable_task_id()->CopyFrom(healthStatus.task_id());
+ status.set_healthy(healthStatus.healthy());
status.set_state(TASK_RUNNING);
if (containerNetworkInfo.isSome()) {
@@ -316,9 +305,9 @@ protected:
driver.get()->sendStatusUpdate(status);
- if (initiateTaskKill) {
+ if (healthStatus.kill_task()) {
killedByHealthCheck = true;
- killTask(driver.get(), taskID);
+ killTask(driver.get(), healthStatus.task_id());
}
}
@@ -540,7 +529,7 @@ private:
health::HealthChecker::create(
healthCheck,
launcherDir,
- self(),
+ defer(self(), &Self::taskHealthUpdated, lambda::_1),
task.task_id(),
containerPid,
namespaces);
http://git-wip-us.apache.org/repos/asf/mesos/blob/14b5e94d/src/health-check/health_checker.cpp
----------------------------------------------------------------------
diff --git a/src/health-check/health_checker.cpp b/src/health-check/health_checker.cpp
index b769ecd..7cdf5df 100644
--- a/src/health-check/health_checker.cpp
+++ b/src/health-check/health_checker.cpp
@@ -61,7 +61,6 @@ using process::Future;
using process::Owned;
using process::Subprocess;
using process::Time;
-using process::UPID;
using std::map;
using std::string;
@@ -120,7 +119,7 @@ pid_t cloneWithSetns(
Try<Owned<HealthChecker>> HealthChecker::create(
const HealthCheck& check,
const string& launcherDir,
- const UPID& executor,
+ const lambda::function<void(const TaskHealthStatus&)>& callback,
const TaskID& taskID,
Option<pid_t> taskPid,
const vector<string>& namespaces)
@@ -134,7 +133,7 @@ Try<Owned<HealthChecker>> HealthChecker::create(
Owned<HealthCheckerProcess> process(new HealthCheckerProcess(
check,
launcherDir,
- executor,
+ callback,
taskID,
taskPid,
namespaces));
@@ -169,15 +168,15 @@ void HealthChecker::stop()
HealthCheckerProcess::HealthCheckerProcess(
const HealthCheck& _check,
const string& _launcherDir,
- const UPID& _executor,
+ const lambda::function<void(const TaskHealthStatus&)>& _callback,
const TaskID& _taskID,
Option<pid_t> _taskPid,
const vector<string>& _namespaces)
: ProcessBase(process::ID::generate("health-checker")),
check(_check),
+ healthUpdateCallback(_callback),
launcherDir(_launcherDir),
initializing(true),
- executor(_executor),
taskID(_taskID),
taskPid(_taskPid),
namespaces(_namespaces),
@@ -243,7 +242,7 @@ void HealthCheckerProcess::failure(const string& message)
// We assume this is a local send, i.e. the health checker library
// is not used in a binary external to the executor and hence can
// not exit before the data is sent to the executor.
- send(executor, taskHealthStatus);
+ healthUpdateCallback(taskHealthStatus);
// Even if we set the `kill_task` flag, it is an executor who kills the task
// and honors the flag (or not). We have no control over the task's lifetime,
@@ -262,7 +261,7 @@ void HealthCheckerProcess::success()
TaskHealthStatus taskHealthStatus;
taskHealthStatus.set_healthy(true);
taskHealthStatus.mutable_task_id()->CopyFrom(taskID);
- send(executor, taskHealthStatus);
+ healthUpdateCallback(taskHealthStatus);
initializing = false;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/14b5e94d/src/health-check/health_checker.hpp
----------------------------------------------------------------------
diff --git a/src/health-check/health_checker.hpp b/src/health-check/health_checker.hpp
index bd7b753..8d410a8 100644
--- a/src/health-check/health_checker.hpp
+++ b/src/health-check/health_checker.hpp
@@ -31,6 +31,7 @@
#include <process/time.hpp>
#include <stout/duration.hpp>
+#include <stout/lambda.hpp>
#include <stout/nothing.hpp>
#include "messages/messages.hpp"
@@ -50,20 +51,23 @@ public:
* checking starts immediately after initialization.
*
* @param check The protobuf message definition of health check.
- * @param executor The executor UPID to which health check results will be
- * reported.
* @param launcherDir A directory where Mesos helper binaries are located.
+ * @param callback A callback HealthChecker uses to send health status
+ * updates to its owner (usually an executor).
* @param taskID The TaskID of the target task.
* @param taskPid The target task's pid used to enter the specified
* namespaces.
* @param namespaces The namespaces to enter prior performing a single health
* check.
* @return A `HealthChecker` object or an error if `create` fails.
+ *
+ * @todo A better approach would be to return a stream of updates, e.g.,
+ * `process::Stream<TaskHealthStatus>` rather than invoking a callback.
*/
static Try<process::Owned<HealthChecker>> create(
const HealthCheck& check,
const std::string& launcherDir,
- const process::UPID& executor,
+ const lambda::function<void(const TaskHealthStatus&)>& callback,
const TaskID& taskID,
Option<pid_t> taskPid,
const std::vector<std::string>& namespaces);
@@ -88,7 +92,7 @@ public:
HealthCheckerProcess(
const HealthCheck& _check,
const std::string& _launcherDir,
- const process::UPID& _executor,
+ const lambda::function<void(const TaskHealthStatus&)>& _callback,
const TaskID& _taskID,
Option<pid_t> _taskPid,
const std::vector<std::string>& _namespaces);
@@ -131,9 +135,9 @@ private:
Duration checkGracePeriod;
Duration checkTimeout;
+ lambda::function<void(const TaskHealthStatus&)> healthUpdateCallback;
std::string launcherDir;
bool initializing;
- process::UPID executor;
TaskID taskID;
Option<pid_t> taskPid;
std::vector<std::string> namespaces;
http://git-wip-us.apache.org/repos/asf/mesos/blob/14b5e94d/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index 99a1f5e..6e377d4 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -37,6 +37,7 @@
#include <stout/flags.hpp>
#include <stout/fs.hpp>
+#include <stout/lambda.hpp>
#include <stout/linkedhashmap.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
@@ -201,12 +202,6 @@ public:
protected:
virtual void initialize()
{
- install<TaskHealthStatus>(
- &Self::taskHealthUpdated,
- &TaskHealthStatus::task_id,
- &TaskHealthStatus::healthy,
- &TaskHealthStatus::kill_task);
-
mesos.reset(new Mesos(
contentType,
defer(self(), &Self::connected),
@@ -407,7 +402,7 @@ protected:
health::HealthChecker::create(
task.health_check(),
launcherDirectory,
- self(),
+ defer(self(), &Self::taskHealthUpdated, lambda::_1),
taskId,
None(),
vector<string>());
@@ -830,27 +825,25 @@ protected:
shutdown();
}
- void taskHealthUpdated(
- const TaskID& taskId,
- bool healthy,
- bool initiateTaskKill)
+ void taskHealthUpdated(const TaskHealthStatus& healthStatus)
{
// This prevents us from sending `TASK_RUNNING` after a terminal status
// update, because we may receive an update from a health check scheduled
// before the task has been waited on.
- if (!checkers.contains(taskId)) {
+ if (!checkers.contains(healthStatus.task_id())) {
return;
}
- LOG(INFO) << "Received task health update for task '" << taskId
- << "', task is "
- << (healthy ? "healthy" : "not healthy");
+ LOG(INFO) << "Received task health update for task"
+ << " '" << healthStatus.task_id() << "', task is "
+ << (healthStatus.healthy() ? "healthy" : "not healthy");
- update(taskId, TASK_RUNNING, None(), healthy);
+ update(
+ healthStatus.task_id(), TASK_RUNNING, None(), healthStatus.healthy());
- if (initiateTaskKill) {
+ if (healthStatus.kill_task()) {
unhealthy = true;
- killTask(taskId);
+ killTask(healthStatus.task_id());
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/14b5e94d/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 38fe4fb..cd5aa46 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -259,12 +259,6 @@ public:
protected:
virtual void initialize()
{
- install<TaskHealthStatus>(
- &CommandExecutor::taskHealthUpdated,
- &TaskHealthStatus::task_id,
- &TaskHealthStatus::healthy,
- &TaskHealthStatus::kill_task);
-
Option<string> value = os::getenv("MESOS_HTTP_COMMAND_EXECUTOR");
// We initialize the library here to ensure that callbacks are only invoked
@@ -297,10 +291,7 @@ protected:
}
}
- void taskHealthUpdated(
- const TaskID& _taskId,
- const bool healthy,
- const bool initiateTaskKill)
+ void taskHealthUpdated(const TaskHealthStatus& healthStatus)
{
// This check prevents us from sending `TASK_RUNNING` updates
// after the task has been transitioned to `TASK_KILLING`.
@@ -309,13 +300,13 @@ protected:
}
cout << "Received task health update, healthy: "
- << stringify(healthy) << endl;
+ << stringify(healthStatus.healthy()) << endl;
- update(_taskId, TASK_RUNNING, healthy);
+ update(healthStatus.task_id(), TASK_RUNNING, healthStatus.healthy());
- if (initiateTaskKill) {
+ if (healthStatus.kill_task()) {
killedByHealthCheck = true;
- kill(_taskId);
+ kill(healthStatus.task_id());
}
}
@@ -454,7 +445,7 @@ protected:
health::HealthChecker::create(
task->health_check(),
launcherDir,
- self(),
+ defer(self(), &Self::taskHealthUpdated, lambda::_1),
task->task_id(),
pid,
namespaces);