You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2016/08/10 21:54:43 UTC

[2/2] mesos git commit: Moved the implementation of health checker to the .cpp file.

Moved the implementation of health checker to the .cpp file.

Review: https://reviews.apache.org/r/50922/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d5f3b0d1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d5f3b0d1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d5f3b0d1

Branch: refs/heads/master
Commit: d5f3b0d17cf1a485b37a6ee1537a7baff05992d9
Parents: f511a24
Author: haosdent huang <ha...@gmail.com>
Authored: Wed Aug 10 22:40:41 2016 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Aug 10 23:53:28 2016 +0200

----------------------------------------------------------------------
 src/health-check/health_checker.cpp | 225 ++++++++++++++++++++++++++++--
 src/health-check/health_checker.hpp | 227 ++-----------------------------
 2 files changed, 225 insertions(+), 227 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d5f3b0d1/src/health-check/health_checker.cpp
----------------------------------------------------------------------
diff --git a/src/health-check/health_checker.cpp b/src/health-check/health_checker.cpp
index 585a0b5..052c7c5 100644
--- a/src/health-check/health_checker.cpp
+++ b/src/health-check/health_checker.cpp
@@ -14,6 +14,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include "health-check/health_checker.hpp"
+
 #include <signal.h>
 #include <stdio.h>
 #include <string.h>
@@ -27,24 +29,32 @@
 
 #include <mesos/mesos.hpp>
 
-#include <process/future.hpp>
-#include <process/pid.hpp>
-#include <process/process.hpp>
-#include <process/protobuf.hpp>
+#include <process/delay.hpp>
+#include <process/owned.hpp>
+#include <process/subprocess.hpp>
 
+#include <stout/os.hpp>
 #include <stout/protobuf.hpp>
 
-#include "health-check/health_checker.hpp"
-
-using namespace mesos;
+#include "common/status_utils.hpp"
 
+using process::delay;
+using process::Clock;
+using process::Future;
+using process::NO_SETSID;
+using process::Owned;
+using process::Promise;
+using process::Subprocess;
+using process::Time;
 using process::UPID;
 
+using std::map;
+using std::string;
+using std::vector;
+
 namespace mesos {
 namespace internal {
 
-using namespace process;
-
 Try<Owned<HealthChecker>> HealthChecker::create(
     const HealthCheck& check,
     const UPID& executor,
@@ -88,5 +98,202 @@ Future<Nothing> HealthChecker::healthCheck()
   return dispatch(process.get(), &HealthCheckerProcess::healthCheck);
 }
 
+
+HealthCheckerProcess::HealthCheckerProcess(
+    const HealthCheck& _check,
+    const UPID& _executor,
+    const TaskID& _taskID)
+  : ProcessBase(process::ID::generate("health-checker")),
+    check(_check),
+    initializing(true),
+    executor(_executor),
+    taskID(_taskID),
+    consecutiveFailures(0) {}
+
+
+Future<Nothing> HealthCheckerProcess::healthCheck()
+{
+  VLOG(2) << "Health checks starting in "
+    << Seconds(check.delay_seconds()) << ", grace period "
+    << Seconds(check.grace_period_seconds());
+
+  startTime = Clock::now();
+
+  delay(Seconds(check.delay_seconds()), self(), &Self::_healthCheck);
+  return promise.future();
+}
+
+
+void HealthCheckerProcess::failure(const string& message)
+{
+  if (check.grace_period_seconds() > 0 &&
+      (Clock::now() - startTime).secs() <= check.grace_period_seconds()) {
+    LOG(INFO) << "Ignoring failure as health check still in grace period";
+    reschedule();
+    return;
+  }
+
+  consecutiveFailures++;
+  VLOG(1) << "#" << consecutiveFailures << " check failed: " << message;
+
+  bool killTask = consecutiveFailures >= check.consecutive_failures();
+
+  TaskHealthStatus taskHealthStatus;
+  taskHealthStatus.set_healthy(false);
+  taskHealthStatus.set_consecutive_failures(consecutiveFailures);
+  taskHealthStatus.set_kill_task(killTask);
+  taskHealthStatus.mutable_task_id()->CopyFrom(taskID);
+  send(executor, taskHealthStatus);
+
+  if (killTask) {
+    // This is a hack to ensure the message is sent to the
+    // executor before we exit the process. Without this,
+    // we may exit before libprocess has sent the data over
+    // the socket. See MESOS-4111.
+    os::sleep(Seconds(1));
+    promise.fail(message);
+  } else {
+    reschedule();
+  }
+}
+
+
+void HealthCheckerProcess::success()
+{
+  VLOG(1) << "Check passed";
+
+  // Send a healthy status update on the first success,
+  // and on the first success following failure(s).
+  if (initializing || consecutiveFailures > 0) {
+    TaskHealthStatus taskHealthStatus;
+    taskHealthStatus.set_healthy(true);
+    taskHealthStatus.mutable_task_id()->CopyFrom(taskID);
+    send(executor, taskHealthStatus);
+    initializing = false;
+  }
+
+  consecutiveFailures = 0;
+  reschedule();
+}
+
+
+void HealthCheckerProcess::_healthCheck()
+{
+  if (check.has_http()) {
+    promise.fail("HTTP health check is not supported");
+    return;
+  }
+
+  if (!check.has_command()) {
+    promise.fail("No check found in health check");
+    return;
+  }
+
+  const CommandInfo& command = check.command();
+
+  map<string, string> environment = os::environment();
+
+  foreach (const Environment::Variable& variable,
+           command.environment().variables()) {
+    environment[variable.name()] = variable.value();
+  }
+
+  // Launch the subprocess.
+  Option<Try<Subprocess>> external = None();
+
+  if (command.shell()) {
+    // Use the shell variant.
+    if (!command.has_value()) {
+      promise.fail("Shell command is not specified");
+      return;
+    }
+
+    VLOG(2) << "Launching health command '" << command.value() << "'";
+
+    external = subprocess(
+        command.value(),
+        Subprocess::PATH("/dev/null"),
+        Subprocess::FD(STDERR_FILENO),
+        Subprocess::FD(STDERR_FILENO),
+        NO_SETSID,
+        environment);
+  } else {
+    // Use the exec variant.
+    if (!command.has_value()) {
+      promise.fail("Executable path is not specified");
+      return;
+    }
+
+    vector<string> argv;
+    foreach (const string& arg, command.arguments()) {
+      argv.push_back(arg);
+    }
+
+    VLOG(2) << "Launching health command [" << command.value() << ", "
+            << strings::join(", ", argv) << "]";
+
+    external = subprocess(
+        command.value(),
+        argv,
+        Subprocess::PATH("/dev/null"),
+        Subprocess::FD(STDERR_FILENO),
+        Subprocess::FD(STDERR_FILENO),
+        NO_SETSID,
+        None(),
+        environment);
+  }
+
+  CHECK_SOME(external);
+
+  if (external.get().isError()) {
+    failure("Error creating subprocess for healthcheck: " +
+            external.get().error());
+    return;
+  }
+
+  pid_t commandPid = external.get().get().pid();
+
+  Future<Option<int>> status = external.get().get().status();
+  status.await(Seconds(check.timeout_seconds()));
+
+  if (!status.isReady()) {
+    string msg = "Command check failed with reason: ";
+    if (status.isFailed()) {
+      msg += "failed with error: " + status.failure();
+    } else if (status.isDiscarded()) {
+      msg += "status future discarded";
+    } else {
+      msg += "status still pending after timeout " +
+             stringify(Seconds(check.timeout_seconds()));
+    }
+
+    if (commandPid != -1) {
+      // Cleanup the external command process.
+      os::killtree(commandPid, SIGKILL);
+      VLOG(1) << "Kill health check command " << commandPid;
+    }
+
+    failure(msg);
+    return;
+  }
+
+  int statusCode = status.get().get();
+  if (statusCode != 0) {
+    string message = "Health command check " + WSTRINGIFY(statusCode);
+    failure(message);
+  } else {
+    success();
+  }
+}
+
+
+void HealthCheckerProcess::reschedule()
+{
+  VLOG(1) << "Rescheduling health check in "
+    << Seconds(check.interval_seconds());
+
+  delay(Seconds(check.interval_seconds()), self(), &Self::_healthCheck);
+}
+
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/d5f3b0d1/src/health-check/health_checker.hpp
----------------------------------------------------------------------
diff --git a/src/health-check/health_checker.hpp b/src/health-check/health_checker.hpp
index b28a9cf..0cdc2de 100644
--- a/src/health-check/health_checker.hpp
+++ b/src/health-check/health_checker.hpp
@@ -17,48 +17,15 @@
 #ifndef __HEALTH_CHECKER_HPP__
 #define __HEALTH_CHECKER_HPP__
 
-#include <signal.h>
-#include <stdio.h>
-#include <string.h>
-#ifndef __WINDOWS__
-#include <unistd.h>
-#endif // __WINDOWS__
-
-#include <iostream>
 #include <string>
 
-#include <mesos/mesos.hpp>
-
-#include <process/delay.hpp>
 #include <process/future.hpp>
-#include <process/id.hpp>
 #include <process/pid.hpp>
 #include <process/process.hpp>
 #include <process/protobuf.hpp>
-#include <process/subprocess.hpp>
-
-#include <stout/duration.hpp>
-#include <stout/flags.hpp>
-#include <stout/json.hpp>
-#include <stout/option.hpp>
-#include <stout/os.hpp>
-#include <stout/protobuf.hpp>
-#include <stout/strings.hpp>
-
-#include <stout/os/killtree.hpp>
-
-#include "common/status_utils.hpp"
 
 #include "messages/messages.hpp"
 
-using std::cout;
-using std::cerr;
-using std::endl;
-using std::map;
-using std::string;
-
-using process::UPID;
-
 namespace mesos {
 namespace internal {
 
@@ -70,7 +37,7 @@ class HealthChecker
 public:
   static Try<process::Owned<HealthChecker>> create(
       const HealthCheck& check,
-      const UPID& executor,
+      const process::UPID& executor,
       const TaskID& taskID);
 
   ~HealthChecker();
@@ -89,202 +56,26 @@ class HealthCheckerProcess : public ProtobufProcess<HealthCheckerProcess>
 public:
   HealthCheckerProcess(
       const HealthCheck& _check,
-      const UPID& _executor,
-      const TaskID& _taskID)
-    : ProcessBase(process::ID::generate("health-checker")),
-      check(_check),
-      initializing(true),
-      executor(_executor),
-      taskID(_taskID),
-      consecutiveFailures(0) {}
+      const process::UPID& _executor,
+      const TaskID& _taskID);
 
   virtual ~HealthCheckerProcess() {}
 
-  process::Future<Nothing> healthCheck()
-  {
-    VLOG(2) << "Health checks starting in "
-      << Seconds(check.delay_seconds()) << ", grace period "
-      << Seconds(check.grace_period_seconds());
-
-    startTime = process::Clock::now();
-
-    delay(Seconds(check.delay_seconds()), self(), &Self::_healthCheck);
-    return promise.future();
-  }
+  process::Future<Nothing> healthCheck();
 
 private:
-  void failure(const string& message)
-  {
-    if (check.grace_period_seconds() > 0 &&
-        (process::Clock::now() - startTime).secs() <=
-          check.grace_period_seconds()) {
-      LOG(INFO) << "Ignoring failure as health check still in grace period";
-      reschedule();
-      return;
-    }
-
-    consecutiveFailures++;
-    VLOG(1) << "#" << consecutiveFailures << " check failed: " << message;
-
-    bool killTask = consecutiveFailures >= check.consecutive_failures();
-
-    TaskHealthStatus taskHealthStatus;
-    taskHealthStatus.set_healthy(false);
-    taskHealthStatus.set_consecutive_failures(consecutiveFailures);
-    taskHealthStatus.set_kill_task(killTask);
-    taskHealthStatus.mutable_task_id()->CopyFrom(taskID);
-    send(executor, taskHealthStatus);
-
-    if (killTask) {
-      // This is a hack to ensure the message is sent to the
-      // executor before we exit the process. Without this,
-      // we may exit before libprocess has sent the data over
-      // the socket. See MESOS-4111.
-      os::sleep(Seconds(1));
-      promise.fail(message);
-    } else {
-      reschedule();
-    }
-  }
-
-  void success()
-  {
-    VLOG(1) << "Check passed";
-
-    // Send a healthy status update on the first success,
-    // and on the first success following failure(s).
-    if (initializing || consecutiveFailures > 0) {
-      TaskHealthStatus taskHealthStatus;
-      taskHealthStatus.set_healthy(true);
-      taskHealthStatus.mutable_task_id()->CopyFrom(taskID);
-      send(executor, taskHealthStatus);
-      initializing = false;
-    }
-    consecutiveFailures = 0;
-    reschedule();
-  }
-
-  void _healthCheck()
-  {
-    if (check.has_http()) {
-      promise.fail("HTTP health check is not supported");
-      return;
-    }
-
-    if (!check.has_command()) {
-      promise.fail("No check found in health check");
-      return;
-    }
-
-    const CommandInfo& command = check.command();
-
-    map<string, string> environment = os::environment();
-
-    foreach (const Environment::Variable& variable,
-             command.environment().variables()) {
-      environment[variable.name()] = variable.value();
-    }
-
-    // Launch the subprocess.
-    Option<Try<process::Subprocess>> external = None();
-
-    if (command.shell()) {
-      // Use the shell variant.
-      if (!command.has_value()) {
-        promise.fail("Shell command is not specified");
-        return;
-      }
-
-      VLOG(2) << "Launching health command '" << command.value() << "'";
-
-      external = process::subprocess(
-          command.value(),
-          process::Subprocess::PATH("/dev/null"),
-          process::Subprocess::FD(STDERR_FILENO),
-          process::Subprocess::FD(STDERR_FILENO),
-          process::NO_SETSID,
-          environment);
-    } else {
-      // Use the exec variant.
-      if (!command.has_value()) {
-        promise.fail("Executable path is not specified");
-        return;
-      }
-
-      vector<string> argv;
-      foreach (const string& arg, command.arguments()) {
-        argv.push_back(arg);
-      }
-
-      VLOG(2) << "Launching health command [" << command.value() << ", "
-              << strings::join(", ", argv) << "]";
-
-      external = process::subprocess(
-          command.value(),
-          argv,
-          process::Subprocess::PATH("/dev/null"),
-          process::Subprocess::FD(STDERR_FILENO),
-          process::Subprocess::FD(STDERR_FILENO),
-          process::NO_SETSID,
-          None(),
-          environment);
-    }
-
-    CHECK_SOME(external);
-
-    if (external.get().isError()) {
-      failure("Error creating subprocess for healthcheck: " +
-              external.get().error());
-      return;
-    }
-
-    pid_t commandPid = external.get().get().pid();
-
-    process::Future<Option<int>> status = external.get().get().status();
-    status.await(Seconds(check.timeout_seconds()));
-
-    if (!status.isReady()) {
-      string msg = "Command check failed with reason: ";
-      if (status.isFailed()) {
-        msg += "failed with error: " + status.failure();
-      } else if (status.isDiscarded()) {
-        msg += "status future discarded";
-      } else {
-        msg += "status still pending after timeout " +
-               stringify(Seconds(check.timeout_seconds()));
-      }
-
-      if (commandPid != -1) {
-        // Cleanup the external command process.
-        os::killtree(commandPid, SIGKILL);
-        VLOG(1) << "Kill health check command " << commandPid;
-      }
-
-      failure(msg);
-      return;
-    }
+  void failure(const std::string& message);
 
-    int statusCode = status.get().get();
-    if (statusCode != 0) {
-      string message = "Health command check " + WSTRINGIFY(statusCode);
-      failure(message);
-    } else {
-      success();
-    }
-  }
+  void success();
 
-  void reschedule()
-  {
-    VLOG(1) << "Rescheduling health check in "
-      << Seconds(check.interval_seconds());
+  void _healthCheck();
 
-    delay(Seconds(check.interval_seconds()), self(), &Self::_healthCheck);
-  }
+  void reschedule();
 
   process::Promise<Nothing> promise;
   HealthCheck check;
   bool initializing;
-  UPID executor;
+  process::UPID executor;
   TaskID taskID;
   uint32_t consecutiveFailures;
   process::Time startTime;