You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2016/11/25 15:19:48 UTC

[09/11] mesos git commit: Used callback instead of `send()` for health status updates.

Used callback instead of `send()` for health status updates.

Since HealthChecker is now used as a library only and does not live
in a separate OS process, there is no need to use libprocess message
sending for health status updates; a callback will do.

Review: https://reviews.apache.org/r/52872/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/14b5e94d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/14b5e94d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/14b5e94d

Branch: refs/heads/master
Commit: 14b5e94dbecde5f2e72ce64d7c8e0384e746ef3d
Parents: 46e348d
Author: Alexander Rukletsov <ru...@gmail.com>
Authored: Fri Nov 25 16:14:29 2016 +0100
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Fri Nov 25 16:14:29 2016 +0100

----------------------------------------------------------------------
 src/docker/executor.cpp             | 27 ++++++++-------------------
 src/health-check/health_checker.cpp | 13 ++++++-------
 src/health-check/health_checker.hpp | 14 +++++++++-----
 src/launcher/default_executor.cpp   | 29 +++++++++++------------------
 src/launcher/executor.cpp           | 21 ++++++---------------
 5 files changed, 40 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/14b5e94d/src/docker/executor.cpp
----------------------------------------------------------------------
diff --git a/src/docker/executor.cpp b/src/docker/executor.cpp
index e9d65ef..94e116f 100644
--- a/src/docker/executor.cpp
+++ b/src/docker/executor.cpp
@@ -32,6 +32,7 @@
 #include <stout/error.hpp>
 #include <stout/flags.hpp>
 #include <stout/json.hpp>
+#include <stout/lambda.hpp>
 #include <stout/os.hpp>
 #include <stout/protobuf.hpp>
 #include <stout/try.hpp>
@@ -277,19 +278,7 @@ public:
   void error(ExecutorDriver* driver, const string& message) {}
 
 protected:
-  virtual void initialize()
-  {
-    install<TaskHealthStatus>(
-        &Self::taskHealthUpdated,
-        &TaskHealthStatus::task_id,
-        &TaskHealthStatus::healthy,
-        &TaskHealthStatus::kill_task);
-  }
-
-  void taskHealthUpdated(
-      const TaskID& taskID,
-      const bool& healthy,
-      const bool& initiateTaskKill)
+  void taskHealthUpdated(const TaskHealthStatus& healthStatus)
   {
     if (driver.isNone()) {
       return;
@@ -302,11 +291,11 @@ protected:
     }
 
     cout << "Received task health update, healthy: "
-         << stringify(healthy) << endl;
+         << stringify(healthStatus.healthy()) << endl;
 
     TaskStatus status;
-    status.mutable_task_id()->CopyFrom(taskID);
-    status.set_healthy(healthy);
+    status.mutable_task_id()->CopyFrom(healthStatus.task_id());
+    status.set_healthy(healthStatus.healthy());
     status.set_state(TASK_RUNNING);
 
     if (containerNetworkInfo.isSome()) {
@@ -316,9 +305,9 @@ protected:
 
     driver.get()->sendStatusUpdate(status);
 
-    if (initiateTaskKill) {
+    if (healthStatus.kill_task()) {
       killedByHealthCheck = true;
-      killTask(driver.get(), taskID);
+      killTask(driver.get(), healthStatus.task_id());
     }
   }
 
@@ -540,7 +529,7 @@ private:
       health::HealthChecker::create(
           healthCheck,
           launcherDir,
-          self(),
+          defer(self(), &Self::taskHealthUpdated, lambda::_1),
           task.task_id(),
           containerPid,
           namespaces);

http://git-wip-us.apache.org/repos/asf/mesos/blob/14b5e94d/src/health-check/health_checker.cpp
----------------------------------------------------------------------
diff --git a/src/health-check/health_checker.cpp b/src/health-check/health_checker.cpp
index b769ecd..7cdf5df 100644
--- a/src/health-check/health_checker.cpp
+++ b/src/health-check/health_checker.cpp
@@ -61,7 +61,6 @@ using process::Future;
 using process::Owned;
 using process::Subprocess;
 using process::Time;
-using process::UPID;
 
 using std::map;
 using std::string;
@@ -120,7 +119,7 @@ pid_t cloneWithSetns(
 Try<Owned<HealthChecker>> HealthChecker::create(
     const HealthCheck& check,
     const string& launcherDir,
-    const UPID& executor,
+    const lambda::function<void(const TaskHealthStatus&)>& callback,
     const TaskID& taskID,
     Option<pid_t> taskPid,
     const vector<string>& namespaces)
@@ -134,7 +133,7 @@ Try<Owned<HealthChecker>> HealthChecker::create(
   Owned<HealthCheckerProcess> process(new HealthCheckerProcess(
       check,
       launcherDir,
-      executor,
+      callback,
       taskID,
       taskPid,
       namespaces));
@@ -169,15 +168,15 @@ void HealthChecker::stop()
 HealthCheckerProcess::HealthCheckerProcess(
     const HealthCheck& _check,
     const string& _launcherDir,
-    const UPID& _executor,
+    const lambda::function<void(const TaskHealthStatus&)>& _callback,
     const TaskID& _taskID,
     Option<pid_t> _taskPid,
     const vector<string>& _namespaces)
   : ProcessBase(process::ID::generate("health-checker")),
     check(_check),
+    healthUpdateCallback(_callback),
     launcherDir(_launcherDir),
     initializing(true),
-    executor(_executor),
     taskID(_taskID),
     taskPid(_taskPid),
     namespaces(_namespaces),
@@ -243,7 +242,7 @@ void HealthCheckerProcess::failure(const string& message)
   // We assume this is a local send, i.e. the health checker library
   // is not used in a binary external to the executor and hence can
   // not exit before the data is sent to the executor.
-  send(executor, taskHealthStatus);
+  healthUpdateCallback(taskHealthStatus);
 
   // Even if we set the `kill_task` flag, it is an executor who kills the task
   // and honors the flag (or not). We have no control over the task's lifetime,
@@ -262,7 +261,7 @@ void HealthCheckerProcess::success()
     TaskHealthStatus taskHealthStatus;
     taskHealthStatus.set_healthy(true);
     taskHealthStatus.mutable_task_id()->CopyFrom(taskID);
-    send(executor, taskHealthStatus);
+    healthUpdateCallback(taskHealthStatus);
     initializing = false;
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/14b5e94d/src/health-check/health_checker.hpp
----------------------------------------------------------------------
diff --git a/src/health-check/health_checker.hpp b/src/health-check/health_checker.hpp
index bd7b753..8d410a8 100644
--- a/src/health-check/health_checker.hpp
+++ b/src/health-check/health_checker.hpp
@@ -31,6 +31,7 @@
 #include <process/time.hpp>
 
 #include <stout/duration.hpp>
+#include <stout/lambda.hpp>
 #include <stout/nothing.hpp>
 
 #include "messages/messages.hpp"
@@ -50,20 +51,23 @@ public:
    * checking starts immediately after initialization.
    *
    * @param check The protobuf message definition of health check.
-   * @param executor The executor UPID to which health check results will be
-   *     reported.
    * @param launcherDir A directory where Mesos helper binaries are located.
+   * @param callback A callback HealthChecker uses to send health status
+   *     updates to its owner (usually an executor).
    * @param taskID The TaskID of the target task.
    * @param taskPid The target task's pid used to enter the specified
    *     namespaces.
    * @param namespaces The namespaces to enter prior performing a single health
    *     check.
    * @return A `HealthChecker` object or an error if `create` fails.
+   *
+   * @todo A better approach would be to return a stream of updates, e.g.,
+   * `process::Stream<TaskHealthStatus>` rather than invoking a callback.
    */
   static Try<process::Owned<HealthChecker>> create(
       const HealthCheck& check,
       const std::string& launcherDir,
-      const process::UPID& executor,
+      const lambda::function<void(const TaskHealthStatus&)>& callback,
       const TaskID& taskID,
       Option<pid_t> taskPid,
       const std::vector<std::string>& namespaces);
@@ -88,7 +92,7 @@ public:
   HealthCheckerProcess(
       const HealthCheck& _check,
       const std::string& _launcherDir,
-      const process::UPID& _executor,
+      const lambda::function<void(const TaskHealthStatus&)>& _callback,
       const TaskID& _taskID,
       Option<pid_t> _taskPid,
       const std::vector<std::string>& _namespaces);
@@ -131,9 +135,9 @@ private:
   Duration checkGracePeriod;
   Duration checkTimeout;
 
+  lambda::function<void(const TaskHealthStatus&)> healthUpdateCallback;
   std::string launcherDir;
   bool initializing;
-  process::UPID executor;
   TaskID taskID;
   Option<pid_t> taskPid;
   std::vector<std::string> namespaces;

http://git-wip-us.apache.org/repos/asf/mesos/blob/14b5e94d/src/launcher/default_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/default_executor.cpp b/src/launcher/default_executor.cpp
index 99a1f5e..6e377d4 100644
--- a/src/launcher/default_executor.cpp
+++ b/src/launcher/default_executor.cpp
@@ -37,6 +37,7 @@
 
 #include <stout/flags.hpp>
 #include <stout/fs.hpp>
+#include <stout/lambda.hpp>
 #include <stout/linkedhashmap.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
@@ -201,12 +202,6 @@ public:
 protected:
   virtual void initialize()
   {
-    install<TaskHealthStatus>(
-        &Self::taskHealthUpdated,
-        &TaskHealthStatus::task_id,
-        &TaskHealthStatus::healthy,
-        &TaskHealthStatus::kill_task);
-
     mesos.reset(new Mesos(
         contentType,
         defer(self(), &Self::connected),
@@ -407,7 +402,7 @@ protected:
           health::HealthChecker::create(
               task.health_check(),
               launcherDirectory,
-              self(),
+              defer(self(), &Self::taskHealthUpdated, lambda::_1),
               taskId,
               None(),
               vector<string>());
@@ -830,27 +825,25 @@ protected:
     shutdown();
   }
 
-  void taskHealthUpdated(
-      const TaskID& taskId,
-      bool healthy,
-      bool initiateTaskKill)
+  void taskHealthUpdated(const TaskHealthStatus& healthStatus)
   {
     // This prevents us from sending `TASK_RUNNING` after a terminal status
     // update, because we may receive an update from a health check scheduled
     // before the task has been waited on.
-    if (!checkers.contains(taskId)) {
+    if (!checkers.contains(healthStatus.task_id())) {
       return;
     }
 
-    LOG(INFO) << "Received task health update for task '" << taskId
-              << "', task is "
-              << (healthy ? "healthy" : "not healthy");
+    LOG(INFO) << "Received task health update for task"
+              << " '" << healthStatus.task_id() << "', task is "
+              << (healthStatus.healthy() ? "healthy" : "not healthy");
 
-    update(taskId, TASK_RUNNING, None(), healthy);
+    update(
+        healthStatus.task_id(), TASK_RUNNING, None(), healthStatus.healthy());
 
-    if (initiateTaskKill) {
+    if (healthStatus.kill_task()) {
       unhealthy = true;
-      killTask(taskId);
+      killTask(healthStatus.task_id());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/14b5e94d/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 38fe4fb..cd5aa46 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -259,12 +259,6 @@ public:
 protected:
   virtual void initialize()
   {
-    install<TaskHealthStatus>(
-        &CommandExecutor::taskHealthUpdated,
-        &TaskHealthStatus::task_id,
-        &TaskHealthStatus::healthy,
-        &TaskHealthStatus::kill_task);
-
     Option<string> value = os::getenv("MESOS_HTTP_COMMAND_EXECUTOR");
 
     // We initialize the library here to ensure that callbacks are only invoked
@@ -297,10 +291,7 @@ protected:
     }
   }
 
-  void taskHealthUpdated(
-      const TaskID& _taskId,
-      const bool healthy,
-      const bool initiateTaskKill)
+  void taskHealthUpdated(const TaskHealthStatus& healthStatus)
   {
     // This check prevents us from sending `TASK_RUNNING` updates
     // after the task has been transitioned to `TASK_KILLING`.
@@ -309,13 +300,13 @@ protected:
     }
 
     cout << "Received task health update, healthy: "
-         << stringify(healthy) << endl;
+         << stringify(healthStatus.healthy()) << endl;
 
-    update(_taskId, TASK_RUNNING, healthy);
+    update(healthStatus.task_id(), TASK_RUNNING, healthStatus.healthy());
 
-    if (initiateTaskKill) {
+    if (healthStatus.kill_task()) {
       killedByHealthCheck = true;
-      kill(_taskId);
+      kill(healthStatus.task_id());
     }
   }
 
@@ -454,7 +445,7 @@ protected:
         health::HealthChecker::create(
             task->health_check(),
             launcherDir,
-            self(),
+            defer(self(), &Self::taskHealthUpdated, lambda::_1),
             task->task_id(),
             pid,
             namespaces);