You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2017/03/15 21:21:03 UTC

[02/13] mesos git commit: Added support for general checks to command executor.

Added support for general checks to command executor.

Review: https://reviews.apache.org/r/56212


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/11160d67
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/11160d67
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/11160d67

Branch: refs/heads/master
Commit: 11160d6716d03865d2a74bad341d7ed33849385f
Parents: e60b57b
Author: Alexander Rukletsov <al...@apache.org>
Authored: Tue Feb 28 16:14:14 2017 +0100
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Mar 15 22:20:20 2017 +0100

----------------------------------------------------------------------
 src/launcher/executor.cpp | 129 ++++++++++++++++++++++++++++++++++++-----
 1 file changed, 116 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/11160d67/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index a32ec5b..bd3c0cf 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -62,6 +62,7 @@
 #include <stout/os/kill.hpp>
 #include <stout/os/killtree.hpp>
 
+#include "checks/checker.hpp"
 #include "checks/health_checker.hpp"
 
 #include "common/http.hpp"
@@ -292,6 +293,41 @@ protected:
     }
   }
 
+  void taskCheckUpdated(
+      const TaskID& _taskId,
+      const CheckStatusInfo& checkStatus)
+  {
+    CHECK_SOME(taskId);
+    CHECK_EQ(taskId.get(), _taskId);
+
+    // This prevents us from sending check updates after a terminal
+    // status update, because we may receive an update from a check
+    // scheduled before the task has been reaped.
+    //
+    // TODO(alexr): Consider sending check updates after TASK_KILLING.
+    if (killed || terminated) {
+      return;
+    }
+
+    cout << "Received check update" << endl;
+
+    // Use the previous task status to preserve all attached information.
+    CHECK_SOME(lastTaskStatus);
+    TaskStatus status = protobuf::createTaskStatus(
+        lastTaskStatus.get(),
+        UUID::random(),
+        Clock::now().secs(),
+        None(),
+        None(),
+        None(),
+        TaskStatus::REASON_TASK_CHECK_STATUS_UPDATED,
+        None(),
+        None(),
+        checkStatus);
+
+    forward(status);
+  }
+
   void taskHealthUpdated(const TaskHealthStatus& healthStatus)
   {
     CHECK_SOME(taskId);
@@ -473,6 +509,35 @@ protected:
 
     cout << "Forked command at " << pid << endl;
 
+    if (task.has_check()) {
+      vector<string> namespaces;
+      if (rootfs.isSome() &&
+          task.check().type() == CheckInfo::COMMAND) {
+        // Make sure command checks are run from the task's mount namespace.
+        // Otherwise if rootfs is specified the command binary may not be
+        // available in the executor.
+        //
+        // NOTE: The command executor shares the network namespace
+        // with its task, hence no need to enter it explicitly.
+        namespaces.push_back("mnt");
+      }
+
+      Try<Owned<checks::Checker>> _checker =
+        checks::Checker::create(
+            task.check(),
+            defer(self(), &Self::taskCheckUpdated, taskId.get(), lambda::_1),
+            taskId.get(),
+            pid,
+            namespaces);
+
+      if (_checker.isError()) {
+        // TODO(alexr): Consider ABORT and return a TASK_FAILED here.
+        cerr << "Failed to create checker: " << _checker.error() << endl;
+      } else {
+        checker = _checker.get();
+      }
+    }
+
     if (task.has_health_check()) {
       vector<string> namespaces;
       if (rootfs.isSome() &&
@@ -632,6 +697,11 @@ private:
         forward(status);
       }
 
+      // Stop checking the task.
+      if (checker.get() != nullptr) {
+        checker->stop();
+      }
+
       // Stop health checking the task.
       if (healthChecker.get() != nullptr) {
         healthChecker->stop();
@@ -672,6 +742,11 @@ private:
   {
     terminated = true;
 
+    // Stop checking the task.
+    if (checker.get() != nullptr) {
+      checker->stop();
+    }
+
     // Stop health checking the task.
     if (healthChecker.get() != nullptr) {
       healthChecker->stop();
@@ -718,8 +793,12 @@ private:
         taskId.get(),
         taskState,
         None(),
-        message,
-        (killed && killedByHealthCheck) ? Option<bool>(false) : None());
+        message);
+
+    // Indicate that a kill occured due to a failing health check.
+    if (killed && killedByHealthCheck) {
+      status.set_healthy(false);
+    }
 
     forward(status);
 
@@ -773,19 +852,16 @@ private:
       const TaskID& _taskId,
       const TaskState& state,
       const Option<TaskStatus::Reason>& reason = None(),
-      const Option<string>& message = None(),
-      const Option<bool>& healthy = None())
+      const Option<string>& message = None())
   {
-    UUID uuid = UUID::random();
+    TaskStatus status = protobuf::createTaskStatus(
+        _taskId,
+        state,
+        UUID::random(),
+        Clock::now().secs());
 
-    TaskStatus status;
-    status.mutable_task_id()->CopyFrom(_taskId);
     status.mutable_executor_id()->CopyFrom(executorId);
-
-    status.set_state(state);
     status.set_source(TaskStatus::SOURCE_EXECUTOR);
-    status.set_uuid(uuid.toBytes());
-    status.set_timestamp(Clock::now().secs());
 
     if (reason.isSome()) {
       status.set_reason(reason.get());
@@ -795,8 +871,34 @@ private:
       status.set_message(message.get());
     }
 
-    if (healthy.isSome()) {
-      status.set_healthy(healthy.get());
+    // TODO(alexr): Augment health information in a way similar to
+    // `CheckStatusInfo`. See MESOS-6417 for more details.
+
+    // If a check for the task has been defined, `check_status` field in each
+    // task status must be set to a valid `CheckStatusInfo` message even if
+    // there is no check status available yet.
+    CHECK(taskData.isSome());
+    if (taskData->taskInfo.has_check()) {
+      CheckStatusInfo checkStatusInfo;
+      checkStatusInfo.set_type(taskData->taskInfo.check().type());
+      switch (taskData->taskInfo.check().type()) {
+        case CheckInfo::COMMAND: {
+          checkStatusInfo.mutable_command();
+          break;
+        }
+
+        case CheckInfo::HTTP: {
+          checkStatusInfo.mutable_http();
+          break;
+        }
+
+        case CheckInfo::UNKNOWN: {
+          CHECK_NE(CheckInfo::UNKNOWN, taskData->taskInfo.check().type());
+          break;
+        }
+      }
+
+      status.mutable_check_status()->CopyFrom(checkStatusInfo);
     }
 
     return status;
@@ -894,6 +996,7 @@ private:
 
   Option<TaskStatus> lastTaskStatus;
 
+  Owned<checks::Checker> checker;
   Owned<checks::HealthChecker> healthChecker;
 };