You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/04/14 01:27:48 UTC

[1/4] mesos git commit: Updated tests for HTTP command executor.

Repository: mesos
Updated Branches:
  refs/heads/master e492b5427 -> 124a05b4f


Updated tests for HTTP command executor.

Review: https://reviews.apache.org/r/45670/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/124a05b4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/124a05b4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/124a05b4

Branch: refs/heads/master
Commit: 124a05b4fe650ddd1ff08e2a616fe4ee4ba8bd5f
Parents: df24b67
Author: Qian Zhang <zh...@cn.ibm.com>
Authored: Wed Apr 13 15:55:34 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Apr 13 16:27:39 2016 -0700

----------------------------------------------------------------------
 src/tests/command_executor_tests.cpp | 30 +++++++++++++++++++++++++-----
 1 file changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/124a05b4/src/tests/command_executor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/command_executor_tests.cpp b/src/tests/command_executor_tests.cpp
index 843233a..07e5eb4 100644
--- a/src/tests/command_executor_tests.cpp
+++ b/src/tests/command_executor_tests.cpp
@@ -47,6 +47,8 @@ using process::PID;
 
 using std::vector;
 
+using ::testing::WithParamInterface;
+
 namespace mesos {
 namespace internal {
 namespace tests {
@@ -54,18 +56,32 @@ namespace tests {
 // Tests that exercise the command executor implementation
 // should be located in this file.
 
-class CommandExecutorTest : public MesosTest {};
+class CommandExecutorTest
+  : public MesosTest,
+    public WithParamInterface<bool> {};
+
+
+// The Command Executor tests are parameterized by the underlying library
+// used by the executor (e.g., driver or the HTTP based executor library).
+INSTANTIATE_TEST_CASE_P(
+    HTTPCommandExecutor,
+    CommandExecutorTest,
+    ::testing::Bool());
 
 
 // This test ensures that the command executor does not send
 // TASK_KILLING to frameworks that do not support the capability.
-TEST_F(CommandExecutorTest, NoTaskKillingCapability)
+TEST_P(CommandExecutorTest, NoTaskKillingCapability)
 {
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.http_command_executor = GetParam();
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
   ASSERT_SOME(slave);
 
   // Start the framework without the task killing capability.
@@ -117,13 +133,17 @@ TEST_F(CommandExecutorTest, NoTaskKillingCapability)
 
 // This test ensures that the command executor sends TASK_KILLING
 // to frameworks that support the capability.
-TEST_F(CommandExecutorTest, TaskKillingCapability)
+TEST_P(CommandExecutorTest, TaskKillingCapability)
 {
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.http_command_executor = GetParam();
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
   ASSERT_SOME(slave);
 
   // Start the framework with the task killing capability.


[3/4] mesos git commit: Added --http_command_executor flag.

Posted by vi...@apache.org.
Added --http_command_executor flag.

Review: https://reviews.apache.org/r/44427/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/df24b670
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/df24b670
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/df24b670

Branch: refs/heads/master
Commit: df24b6700aa79448782eab6277039d9129f9a17e
Parents: bec4d71
Author: Qian Zhang <zh...@cn.ibm.com>
Authored: Wed Apr 13 15:55:29 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Apr 13 16:27:39 2016 -0700

----------------------------------------------------------------------
 docs/configuration.md | 13 +++++++++++++
 src/slave/flags.cpp   | 10 ++++++++++
 src/slave/flags.hpp   |  1 +
 src/slave/slave.cpp   | 28 ++++++++++++++++++++++++----
 4 files changed, 48 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/df24b670/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index ba00ec5..ce51f26 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1290,6 +1290,19 @@ Example:
 </tr>
 <tr>
   <td>
+    --[no-]http_command_executor
+  </td>
+  <td>
+The underlying executor library to be used for the command executor.
+If set to <code>true</code>, the command executor would use the HTTP based
+executor library to interact with the Mesos agent. If set to <code>false</code>,
+the driver based implementation would be used.
+<b>NOTE</b>: This flag is *experimental* and should not be used in
+production yet. (default: false)
+  </td>
+</tr>
+<tr>
+  <td>
     --image_providers=VALUE
   </td>
   <td>

http://git-wip-us.apache.org/repos/asf/mesos/blob/df24b670/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index dd7bc9a..316feec 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -783,4 +783,14 @@ mesos::internal::slave::Flags::Flags()
       "The ranges of XFS project IDs to use for tracking directory quotas",
       "[5000-10000]");
 #endif
+
+  add(&Flags::http_command_executor,
+      "http_command_executor",
+      "The underlying executor library to be used for the command executor.\n"
+      "If set to `true`, the command executor would use the HTTP based\n"
+      "executor library to interact with the Mesos agent. If set to `false`,\n"
+      "the driver based implementation would be used.\n"
+      "NOTE: This flag is *experimental* and should not be used in\n"
+      "production yet.",
+      false);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/df24b670/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 300db49..ee520ac 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -147,6 +147,7 @@ public:
 #if ENABLE_XFS_DISK_ISOLATOR
   std::string xfs_project_range;
 #endif
+  bool http_command_executor;
 };
 
 } // namespace slave {

http://git-wip-us.apache.org/repos/asf/mesos/blob/df24b670/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 6ee277d..49fa4a0 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3898,8 +3898,14 @@ ExecutorInfo Slave::getExecutorInfo(
       executor.mutable_command()->set_user(task.command().user());
     }
 
-    Result<string> path =
-      os::realpath(path::join(flags.launcher_dir, "mesos-executor"));
+    Result<string> path = None();
+    if (flags.http_command_executor) {
+      path =
+        os::realpath(path::join(flags.launcher_dir, "mesos-http-executor"));
+    } else {
+      path =
+        os::realpath(path::join(flags.launcher_dir, "mesos-executor"));
+    }
 
     // Explicitly set 'shell' to true since we want to use the shell
     // for running the mesos-executor (and even though this is the
@@ -3909,7 +3915,11 @@ ExecutorInfo Slave::getExecutorInfo(
     if (path.isSome()) {
       if (hasRootfs) {
         executor.mutable_command()->set_shell(false);
-        executor.mutable_command()->add_arguments("mesos-executor");
+        if (flags.http_command_executor) {
+          executor.mutable_command()->add_arguments("mesos-http-executor");
+        } else {
+          executor.mutable_command()->add_arguments("mesos-executor");
+        }
         executor.mutable_command()->add_arguments(
             "--sandbox_directory=" + flags.sandbox_directory);
 
@@ -5864,12 +5874,22 @@ Executor::Executor(
 {
   CHECK_NOTNULL(slave);
 
+  // See if this is driver based command executor.
   Result<string> executorPath =
     os::realpath(path::join(slave->flags.launcher_dir, "mesos-executor"));
 
   if (executorPath.isSome()) {
     commandExecutor =
-      strings::contains(info.command().value(), executorPath.get());
+        strings::contains(info.command().value(), executorPath.get());
+  }
+
+  // See if this is HTTP based command executor.
+  if (!commandExecutor) {
+    executorPath = os::realpath(
+        path::join(slave->flags.launcher_dir, "mesos-http-executor"));
+
+    commandExecutor =
+        strings::contains(info.command().value(), executorPath.get());
   }
 }
 


[2/4] mesos git commit: Updated http_command_executor.cpp to use v1 API.

Posted by vi...@apache.org.
Updated http_command_executor.cpp to use v1 API.

Review: https://reviews.apache.org/r/44424/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bec4d711
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bec4d711
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bec4d711

Branch: refs/heads/master
Commit: bec4d711c3fc190138d100023bba9e3fe087052e
Parents: ed30403
Author: Qian Zhang <zh...@cn.ibm.com>
Authored: Wed Apr 13 15:55:25 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Apr 13 16:27:39 2016 -0700

----------------------------------------------------------------------
 src/launcher/http_command_executor.cpp | 539 +++++++++++++++-------------
 1 file changed, 295 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bec4d711/src/launcher/http_command_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/http_command_executor.cpp b/src/launcher/http_command_executor.cpp
index 7677391..ad484e0 100644
--- a/src/launcher/http_command_executor.cpp
+++ b/src/launcher/http_command_executor.cpp
@@ -24,7 +24,9 @@
 #include <string>
 #include <vector>
 
-#include <mesos/executor.hpp>
+#include <mesos/v1/executor.hpp>
+#include <mesos/v1/mesos.hpp>
+
 #include <mesos/type_utils.hpp>
 
 #include <process/defer.hpp>
@@ -41,6 +43,7 @@
 #include <stout/flags.hpp>
 #include <stout/json.hpp>
 #include <stout/lambda.hpp>
+#include <stout/linkedhashmap.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
 #include <stout/path.hpp>
@@ -50,6 +53,8 @@
 #include "common/http.hpp"
 #include "common/status_utils.hpp"
 
+#include "internal/evolve.hpp"
+
 #ifdef __linux__
 #include "linux/fs.hpp"
 #endif
@@ -60,101 +65,230 @@
 
 #include "slave/constants.hpp"
 
-using namespace mesos::internal::slave;
+#ifdef __linux__
+namespace fs = mesos::internal::fs;
+#endif
 
-using process::wait; // Necessary on some OS's to disambiguate.
+using namespace mesos::internal::slave;
 
 using std::cout;
 using std::cerr;
 using std::endl;
+using std::queue;
 using std::string;
 using std::vector;
 
+using process::Clock;
+using process::Future;
+using process::Owned;
+using process::Subprocess;
+using process::Timer;
+
+using mesos::internal::evolve;
+using mesos::internal::TaskHealthStatus;
+
+using mesos::v1::ExecutorID;
+using mesos::v1::FrameworkID;
+
+using mesos::v1::executor::Call;
+using mesos::v1::executor::Event;
+using mesos::v1::executor::Mesos;
+
 namespace mesos {
+namespace v1 {
 namespace internal {
 
-using namespace process;
-
-class CommandExecutorProcess : public ProtobufProcess<CommandExecutorProcess>
+class HttpCommandExecutor: public ProtobufProcess<HttpCommandExecutor>
 {
 public:
-  CommandExecutorProcess(
-      const Option<char**>& override,
+  HttpCommandExecutor(
+      const Option<char**>& _override,
       const string& _healthCheckDir,
       const Option<string>& _sandboxDirectory,
       const Option<string>& _workingDirectory,
       const Option<string>& _user,
       const Option<string>& _taskCommand,
+      const FrameworkID& _frameworkId,
+      const ExecutorID& _executorId,
       const Duration& _shutdownGracePeriod)
-    : state(REGISTERING),
-      launched(false),
-      killed(false),
-      killedByHealthCheck(false),
-      pid(-1),
-      healthPid(-1),
-      shutdownGracePeriod(_shutdownGracePeriod),
-      driver(None()),
-      frameworkInfo(None()),
-      taskId(None()),
-      healthCheckDir(_healthCheckDir),
-      override(override),
-      sandboxDirectory(_sandboxDirectory),
-      workingDirectory(_workingDirectory),
-      user(_user),
-      taskCommand(_taskCommand) {}
-
-  virtual ~CommandExecutorProcess() {}
-
-  void registered(
-      ExecutorDriver* _driver,
-      const ExecutorInfo& _executorInfo,
-      const FrameworkInfo& _frameworkInfo,
-      const SlaveInfo& slaveInfo)
+  : state(DISCONNECTED),
+    launched(false),
+    killed(false),
+    killedByHealthCheck(false),
+    pid(-1),
+    healthPid(-1),
+    shutdownGracePeriod(_shutdownGracePeriod),
+    frameworkInfo(None()),
+    taskId(None()),
+    healthCheckDir(_healthCheckDir),
+    override(_override),
+    sandboxDirectory(_sandboxDirectory),
+    workingDirectory(_workingDirectory),
+    user(_user),
+    taskCommand(_taskCommand),
+    frameworkId(_frameworkId),
+    executorId(_executorId),
+    task(None()) {}
+
+  virtual ~HttpCommandExecutor() = default;
+
+  void connected()
   {
-    CHECK_EQ(REGISTERING, state);
+    state = CONNECTED;
+
+    doReliableRegistration();
+  }
+
+  void disconnected()
+  {
+    state = DISCONNECTED;
+  }
+
+  void received(queue<Event> events)
+  {
+    while (!events.empty()) {
+      Event event = events.front();
+      events.pop();
+
+      cout << "Received " << event.type() << " event" << endl;
+
+      switch (event.type()) {
+        case Event::SUBSCRIBED: {
+          cout << "Subscribed executor on "
+               << event.subscribed().agent_info().hostname() << endl;
+
+          frameworkInfo = event.subscribed().framework_info();
+          state = SUBSCRIBED;
+          break;
+        }
 
-    cout << "Registered executor on " << slaveInfo.hostname() << endl;
+        case Event::LAUNCH: {
+          launch(event.launch().task());
+          break;
+        }
 
-    driver = _driver;
-    frameworkInfo = _frameworkInfo;
+        case Event::KILL: {
+          kill(event.kill().task_id());
+          break;
+        }
 
-    state = REGISTERED;
+        case Event::ACKNOWLEDGED: {
+          // Remove the corresponding update.
+          updates.erase(UUID::fromBytes(event.acknowledged().uuid()));
+
+          // Remove the corresponding task.
+          task = None();
+          break;
+        }
+
+        case Event::SHUTDOWN: {
+          shutdown();
+          break;
+        }
+
+        case Event::MESSAGE: {
+          break;
+        }
+
+        case Event::ERROR: {
+          cerr << "Error: " << event.error().message() << endl;
+        }
+
+        default: {
+          UNREACHABLE();
+        }
+      }
+    }
   }
 
-  void reregistered(
-      ExecutorDriver* driver,
-      const SlaveInfo& slaveInfo)
+protected:
+  virtual void initialize()
   {
-    CHECK(state == REGISTERED || state == REGISTERING) << state;
+    // TODO(qianzhang): Currently, the `mesos-health-check` binary can only
+    // send unversioned `TaskHealthStatus` messages. This needs to be revisited
+    // as part of MESOS-5103.
+    install<TaskHealthStatus>(
+        &HttpCommandExecutor::taskHealthUpdated,
+        &TaskHealthStatus::task_id,
+        &TaskHealthStatus::healthy,
+        &TaskHealthStatus::kill_task);
 
-    cout << "Re-registered executor on " << slaveInfo.hostname() << endl;
+    // We initialize the library here to ensure that callbacks are only invoked
+    // after the process has spawned.
+    mesos.reset(new Mesos(
+        mesos::ContentType::PROTOBUF,
+        defer(self(), &Self::connected),
+        defer(self(), &Self::disconnected),
+        defer(self(), &Self::received, lambda::_1)));
+  }
 
-    state = REGISTERED;
+  void taskHealthUpdated(
+      const mesos::TaskID& taskID,
+      const bool healthy,
+      const bool initiateTaskKill)
+  {
+    cout << "Received task health update, healthy: "
+         << stringify(healthy) << endl;
+
+    update(evolve(taskID), TASK_RUNNING, healthy);
+
+    if (initiateTaskKill) {
+      killedByHealthCheck = true;
+      kill(evolve(taskID));
+    }
   }
 
-  void disconnected(ExecutorDriver* driver) {}
+  void doReliableRegistration()
+  {
+    if (state == SUBSCRIBED || state == DISCONNECTED) {
+      return;
+    }
+
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.mutable_executor_id()->CopyFrom(executorId);
 
-  void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+
+    // Send all unacknowledged updates.
+    foreach (const Call::Update& update, updates.values()) {
+      subscribe->add_unacknowledged_updates()->MergeFrom(update);
+    }
+
+    // Send the unacknowledged task.
+    if (task.isSome()) {
+      subscribe->add_unacknowledged_tasks()->MergeFrom(task.get());
+    }
+
+    mesos->send(call);
+
+    delay(Seconds(1), self(), &Self::doReliableRegistration);
+  }
+
+  void launch(const TaskInfo& _task)
   {
-    CHECK_EQ(REGISTERED, state);
+    CHECK_EQ(SUBSCRIBED, state);
 
     if (launched) {
-      TaskStatus status;
-      status.mutable_task_id()->MergeFrom(task.task_id());
-      status.set_state(TASK_FAILED);
-      status.set_message(
+      update(
+          _task.task_id(),
+          TASK_FAILED,
+          None(),
           "Attempted to run multiple tasks using a \"command\" executor");
-
-      driver->sendStatusUpdate(status);
       return;
     }
 
+    // Capture the task.
+    task = _task;
+
     // Capture the TaskID.
-    taskId = task.task_id();
+    taskId = task->task_id();
 
     // Capture the kill policy.
-    if (task.has_kill_policy()) {
-      killPolicy = task.kill_policy();
+    if (task->has_kill_policy()) {
+      killPolicy = task->kill_policy();
     }
 
     // Determine the command to launch the task.
@@ -175,11 +309,11 @@ public:
       }
 
       command = parse.get();
-    } else if (task.has_command()) {
-      command = task.command();
+    } else if (task->has_command()) {
+      command = task->command();
     } else {
       CHECK_SOME(override)
-        << "Expecting task '" << task.task_id()
+        << "Expecting task '" << task->task_id()
         << "' to have a command!";
     }
 
@@ -191,16 +325,16 @@ public:
       // correct solution is to perform this validation at master side.
       if (command.shell()) {
         CHECK(command.has_value())
-          << "Shell command of task '" << task.task_id()
+          << "Shell command of task '" << task->task_id()
           << "' is not specified!";
       } else {
         CHECK(command.has_value())
-          << "Executable of task '" << task.task_id()
+          << "Executable of task '" << task->task_id()
           << "' is not specified!";
       }
     }
 
-    cout << "Starting task " << task.task_id() << endl;
+    cout << "Starting task " << task->task_id() << endl;
 
     // TODO(benh): Clean this up with the new 'Fork' abstraction.
     // Use pipes to determine which child has successfully changed
@@ -448,25 +582,22 @@ public:
 
     cout << "Forked command at " << pid << endl;
 
-    if (task.has_health_check()) {
-      launchHealthCheck(task);
+    if (task->has_health_check()) {
+      launchHealthCheck(task.get());
     }
 
     // Monitor this process.
     process::reap(pid)
-      .onAny(defer(self(), &Self::reaped, driver, pid, lambda::_1));
+      .onAny(defer(self(), &Self::reaped, pid, lambda::_1));
 
-    TaskStatus status;
-    status.mutable_task_id()->MergeFrom(task.task_id());
-    status.set_state(TASK_RUNNING);
-    driver->sendStatusUpdate(status);
+    update(task->task_id(), TASK_RUNNING);
 
     launched = true;
   }
 
-  void killTask(ExecutorDriver* driver, const TaskID& taskId)
+  void kill(const TaskID& taskId)
   {
-    cout << "Received killTask for task " << taskId.value() << endl;
+    cout << "Received kill for task " << taskId.value() << endl;
 
     // Default grace period is set to 3s for backwards compatibility.
     //
@@ -478,12 +609,10 @@ public:
       gracePeriod = Nanoseconds(killPolicy->grace_period().nanoseconds());
     }
 
-    killTask(driver, taskId, gracePeriod);
+    kill(taskId, gracePeriod);
   }
 
-  void frameworkMessage(ExecutorDriver* driver, const string& data) {}
-
-  void shutdown(ExecutorDriver* driver)
+  void shutdown()
   {
     cout << "Shutting down" << endl;
 
@@ -504,53 +633,12 @@ public:
     // agent is smaller than the kill grace period).
     if (launched) {
       CHECK_SOME(taskId);
-      killTask(driver, taskId.get(), gracePeriod);
-    } else {
-      driver->stop();
-    }
-  }
-
-  virtual void error(ExecutorDriver* driver, const string& message) {}
-
-protected:
-  virtual void initialize()
-  {
-    install<TaskHealthStatus>(
-        &CommandExecutorProcess::taskHealthUpdated,
-        &TaskHealthStatus::task_id,
-        &TaskHealthStatus::healthy,
-        &TaskHealthStatus::kill_task);
-  }
-
-  void taskHealthUpdated(
-      const TaskID& taskID,
-      const bool& healthy,
-      const bool& initiateTaskKill)
-  {
-    if (driver.isNone()) {
-      return;
-    }
-
-    cout << "Received task health update, healthy: "
-         << stringify(healthy) << endl;
-
-    TaskStatus status;
-    status.mutable_task_id()->CopyFrom(taskID);
-    status.set_healthy(healthy);
-    status.set_state(TASK_RUNNING);
-    driver.get()->sendStatusUpdate(status);
-
-    if (initiateTaskKill) {
-      killedByHealthCheck = true;
-      killTask(driver.get(), taskID);
+      kill(taskId.get(), gracePeriod);
     }
   }
 
 private:
-  void killTask(
-      ExecutorDriver* driver,
-      const TaskID& _taskId,
-      const Duration& gracePeriod)
+  void kill(const TaskID& _taskId, const Duration& gracePeriod)
   {
     if (launched && !killed) {
       // Send TASK_KILLING if the framework can handle it.
@@ -561,10 +649,7 @@ private:
       foreach (const FrameworkInfo::Capability& c,
                frameworkInfo->capabilities()) {
         if (c.type() == FrameworkInfo::Capability::TASK_KILLING_STATE) {
-          TaskStatus status;
-          status.mutable_task_id()->CopyFrom(taskId.get());
-          status.set_state(TASK_KILLING);
-          driver->sendStatusUpdate(status);
+          update(taskId.get(), TASK_KILLING);
           break;
         }
       }
@@ -606,10 +691,7 @@ private:
     }
   }
 
-  void reaped(
-      ExecutorDriver* driver,
-      pid_t pid,
-      const Future<Option<int> >& status_)
+  void reaped(pid_t pid, const Future<Option<int> >& status_)
   {
     TaskState taskState;
     string message;
@@ -632,7 +714,7 @@ private:
         taskState = TASK_FINISHED;
       } else if (killed) {
         // Send TASK_KILLED if the task was killed as a result of
-        // killTask() or shutdown().
+        // kill() or shutdown().
         taskState = TASK_KILLED;
       } else {
         taskState = TASK_FAILED;
@@ -645,22 +727,17 @@ private:
 
     CHECK_SOME(taskId);
 
-    TaskStatus taskStatus;
-    taskStatus.mutable_task_id()->MergeFrom(taskId.get());
-    taskStatus.set_state(taskState);
-    taskStatus.set_message(message);
     if (killed && killedByHealthCheck) {
-      taskStatus.set_healthy(false);
+      update(taskId.get(), taskState, false, message);
+    } else {
+      update(taskId.get(), taskState, None(), message);
     }
 
-    driver->sendStatusUpdate(taskStatus);
-
-    // This is a hack to ensure the message is sent to the
-    // slave before we exit the process. Without this, we
-    // may exit before libprocess has sent the data over
-    // the socket. See MESOS-4111.
+    // TODO(qianzhang): Remove this hack since the executor now receives
+    // acknowledgements for status updates. The executor can terminate
+    // after it receives an ACK for a terminal status update.
     os::sleep(Seconds(1));
-    driver->stop();
+    terminate(self());
   }
 
   void escalated(Duration timeout)
@@ -728,10 +805,49 @@ private:
          << stringify(healthPid) << endl;
   }
 
+  void update(
+      const TaskID& taskID,
+      const TaskState& state,
+      const Option<bool>& healthy = None(),
+      const Option<string>& message = None())
+  {
+    UUID uuid = UUID::random();
+
+    TaskStatus status;
+    status.mutable_task_id()->CopyFrom(taskID);
+    status.mutable_executor_id()->CopyFrom(executorId);
+
+    status.set_state(state);
+    status.set_source(TaskStatus::SOURCE_EXECUTOR);
+    status.set_uuid(uuid.toBytes());
+
+    if (healthy.isSome()) {
+      status.set_healthy(healthy.get());
+    }
+
+    if (message.isSome()) {
+      status.set_message(message.get());
+    }
+
+    Call call;
+    call.set_type(Call::UPDATE);
+
+    call.mutable_framework_id()->CopyFrom(frameworkId);
+    call.mutable_executor_id()->CopyFrom(executorId);
+
+    call.mutable_update()->mutable_status()->CopyFrom(status);
+
+    // Capture the status update.
+    updates[uuid] = call.update();
+
+    mesos->send(call);
+  }
+
   enum State
   {
-    REGISTERING, // Executor is launched but not (re-)registered yet.
-    REGISTERED,  // Executor has (re-)registered.
+    CONNECTED,
+    DISCONNECTED,
+    SUBSCRIBED
   } state;
 
   bool launched;
@@ -742,7 +858,6 @@ private:
   Duration shutdownGracePeriod;
   Option<KillPolicy> killPolicy;
   Timer escalationTimer;
-  Option<ExecutorDriver*> driver;
   Option<FrameworkInfo> frameworkInfo;
   Option<TaskID> taskId;
   string healthCheckDir;
@@ -751,99 +866,15 @@ private:
   Option<string> workingDirectory;
   Option<string> user;
   Option<string> taskCommand;
-};
-
-
-class CommandExecutor: public Executor
-{
-public:
-  CommandExecutor(
-      const Option<char**>& override,
-      const string& healthCheckDir,
-      const Option<string>& sandboxDirectory,
-      const Option<string>& workingDirectory,
-      const Option<string>& user,
-      const Option<string>& taskCommand,
-      const Duration& shutdownGracePeriod)
-  {
-    process = new CommandExecutorProcess(
-        override,
-        healthCheckDir,
-        sandboxDirectory,
-        workingDirectory,
-        user,
-        taskCommand,
-        shutdownGracePeriod);
-
-    spawn(process);
-  }
-
-  virtual ~CommandExecutor()
-  {
-    terminate(process);
-    wait(process);
-    delete process;
-  }
-
-  virtual void registered(
-        ExecutorDriver* driver,
-        const ExecutorInfo& executorInfo,
-        const FrameworkInfo& frameworkInfo,
-        const SlaveInfo& slaveInfo)
-  {
-    dispatch(process,
-             &CommandExecutorProcess::registered,
-             driver,
-             executorInfo,
-             frameworkInfo,
-             slaveInfo);
-  }
-
-  virtual void reregistered(
-      ExecutorDriver* driver,
-      const SlaveInfo& slaveInfo)
-  {
-    dispatch(process,
-             &CommandExecutorProcess::reregistered,
-             driver,
-             slaveInfo);
-  }
-
-  virtual void disconnected(ExecutorDriver* driver)
-  {
-    dispatch(process, &CommandExecutorProcess::disconnected, driver);
-  }
-
-  virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
-  {
-    dispatch(process, &CommandExecutorProcess::launchTask, driver, task);
-  }
-
-  virtual void killTask(ExecutorDriver* driver, const TaskID& taskId)
-  {
-    dispatch(process, &CommandExecutorProcess::killTask, driver, taskId);
-  }
-
-  virtual void frameworkMessage(ExecutorDriver* driver, const string& data)
-  {
-    dispatch(process, &CommandExecutorProcess::frameworkMessage, driver, data);
-  }
-
-  virtual void shutdown(ExecutorDriver* driver)
-  {
-    dispatch(process, &CommandExecutorProcess::shutdown, driver);
-  }
-
-  virtual void error(ExecutorDriver* driver, const string& data)
-  {
-    dispatch(process, &CommandExecutorProcess::error, driver, data);
-  }
-
-private:
-  CommandExecutorProcess* process;
+  const FrameworkID frameworkId;
+  const ExecutorID executorId;
+  Owned<Mesos> mesos;
+  LinkedHashMap<UUID, Call::Update> updates; // Unacknowledged updates.
+  Option<TaskInfo> task; // Unacknowledged task.
 };
 
 } // namespace internal {
+} // namespace v1 {
 } // namespace mesos {
 
 
@@ -897,6 +928,8 @@ public:
 int main(int argc, char** argv)
 {
   Flags flags;
+  FrameworkID frameworkId;
+  ExecutorID executorId;
 
   // Load flags from command line.
   Try<Nothing> load = flags.load(None(), &argc, &argv);
@@ -929,6 +962,20 @@ int main(int argc, char** argv)
     ? envPath.get()
     : os::realpath(Path(argv[0]).dirname()).get();
 
+  Option<string> value = os::getenv("MESOS_FRAMEWORK_ID");
+  if (value.isNone()) {
+    EXIT(EXIT_FAILURE)
+      << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment";
+  }
+  frameworkId.set_value(value.get());
+
+  value = os::getenv("MESOS_EXECUTOR_ID");
+  if (value.isNone()) {
+    EXIT(EXIT_FAILURE)
+      << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment";
+  }
+  executorId.set_value(value.get());
+
   // Get executor shutdown grace period from the environment.
   //
   // NOTE: We avoided introducing a command executor flag for this
@@ -936,7 +983,7 @@ int main(int argc, char** argv)
   // This makes it difficult to add or remove command executor flags
   // that are unconditionally set by the agent.
   Duration shutdownGracePeriod = DEFAULT_EXECUTOR_SHUTDOWN_GRACE_PERIOD;
-  Option<string> value = os::getenv("MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD");
+  value = os::getenv("MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD");
   if (value.isSome()) {
     Try<Duration> parse = Duration::parse(value.get());
     if (parse.isError()) {
@@ -948,16 +995,20 @@ int main(int argc, char** argv)
     shutdownGracePeriod = parse.get();
   }
 
-  mesos::internal::CommandExecutor executor(
-      override,
-      path,
-      flags.sandbox_directory,
-      flags.working_directory,
-      flags.user,
-      flags.task_command,
-      shutdownGracePeriod);
-
-  mesos::MesosExecutorDriver driver(&executor);
-
-  return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
+  Owned<mesos::v1::internal::HttpCommandExecutor> executor(
+      new mesos::v1::internal::HttpCommandExecutor(
+          override,
+          path,
+          flags.sandbox_directory,
+          flags.working_directory,
+          flags.user,
+          flags.task_command,
+          frameworkId,
+          executorId,
+          shutdownGracePeriod));
+
+  process::spawn(executor.get());
+  process::wait(executor.get());
+
+  return EXIT_SUCCESS;
 }


[4/4] mesos git commit: Added HTTP command executor to make files.

Posted by vi...@apache.org.
Added HTTP command executor to make files.

Added HTTP command executor to make files. For now the content in
http_command_executor.cpp is identical to executor.cpp, and it
will be updated in the subsequent review.

Review: https://reviews.apache.org/r/44423/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ed304030
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ed304030
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ed304030

Branch: refs/heads/master
Commit: ed304030fc5a36d1c1f13e505d6b56f928a81cd4
Parents: e492b54
Author: Qian Zhang <zh...@cn.ibm.com>
Authored: Wed Apr 13 15:55:21 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Apr 13 16:27:39 2016 -0700

----------------------------------------------------------------------
 src/Makefile.am                        |   5 +
 src/launcher/http_command_executor.cpp | 963 ++++++++++++++++++++++++++++
 2 files changed, 968 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ed304030/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index a8f6831..139935f 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1181,6 +1181,11 @@ mesos_executor_SOURCES = launcher/executor.cpp
 mesos_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
 mesos_executor_LDADD = libmesos.la $(LDADD)
 
+pkglibexec_PROGRAMS += mesos-http-executor
+mesos_http_executor_SOURCES = launcher/http_command_executor.cpp
+mesos_http_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
+mesos_http_executor_LDADD = libmesos.la $(LDADD)
+
 pkglibexec_PROGRAMS += mesos-containerizer
 mesos_containerizer_SOURCES = slave/containerizer/mesos/main.cpp
 mesos_containerizer_CPPFLAGS = $(MESOS_CPPFLAGS)

http://git-wip-us.apache.org/repos/asf/mesos/blob/ed304030/src/launcher/http_command_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/http_command_executor.cpp b/src/launcher/http_command_executor.cpp
new file mode 100644
index 0000000..7677391
--- /dev/null
+++ b/src/launcher/http_command_executor.cpp
@@ -0,0 +1,963 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <signal.h>
+#include <stdio.h>
+
+#include <sys/wait.h>
+
+#include <iostream>
+#include <list>
+#include <string>
+#include <vector>
+
+#include <mesos/executor.hpp>
+#include <mesos/type_utils.hpp>
+
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/future.hpp>
+#include <process/io.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+#include <process/subprocess.hpp>
+#include <process/reap.hpp>
+#include <process/timer.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/flags.hpp>
+#include <stout/json.hpp>
+#include <stout/lambda.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/strings.hpp>
+
+#include "common/http.hpp"
+#include "common/status_utils.hpp"
+
+#ifdef __linux__
+#include "linux/fs.hpp"
+#endif
+
+#include "logging/logging.hpp"
+
+#include "messages/messages.hpp"
+
+#include "slave/constants.hpp"
+
+using namespace mesos::internal::slave;
+
+using process::wait; // Necessary on some OS's to disambiguate.
+
+using std::cout;
+using std::cerr;
+using std::endl;
+using std::string;
+using std::vector;
+
+namespace mesos {
+namespace internal {
+
+using namespace process;
+
+class CommandExecutorProcess : public ProtobufProcess<CommandExecutorProcess>
+{
+public:
+  CommandExecutorProcess(
+      const Option<char**>& override,
+      const string& _healthCheckDir,
+      const Option<string>& _sandboxDirectory,
+      const Option<string>& _workingDirectory,
+      const Option<string>& _user,
+      const Option<string>& _taskCommand,
+      const Duration& _shutdownGracePeriod)
+    : state(REGISTERING),
+      launched(false),
+      killed(false),
+      killedByHealthCheck(false),
+      pid(-1),
+      healthPid(-1),
+      shutdownGracePeriod(_shutdownGracePeriod),
+      driver(None()),
+      frameworkInfo(None()),
+      taskId(None()),
+      healthCheckDir(_healthCheckDir),
+      override(override),
+      sandboxDirectory(_sandboxDirectory),
+      workingDirectory(_workingDirectory),
+      user(_user),
+      taskCommand(_taskCommand) {}
+
+  virtual ~CommandExecutorProcess() {}
+
+  void registered(
+      ExecutorDriver* _driver,
+      const ExecutorInfo& _executorInfo,
+      const FrameworkInfo& _frameworkInfo,
+      const SlaveInfo& slaveInfo)
+  {
+    CHECK_EQ(REGISTERING, state);
+
+    cout << "Registered executor on " << slaveInfo.hostname() << endl;
+
+    driver = _driver;
+    frameworkInfo = _frameworkInfo;
+
+    state = REGISTERED;
+  }
+
+  void reregistered(
+      ExecutorDriver* driver,
+      const SlaveInfo& slaveInfo)
+  {
+    CHECK(state == REGISTERED || state == REGISTERING) << state;
+
+    cout << "Re-registered executor on " << slaveInfo.hostname() << endl;
+
+    state = REGISTERED;
+  }
+
+  void disconnected(ExecutorDriver* driver) {}
+
+  void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+  {
+    CHECK_EQ(REGISTERED, state);
+
+    if (launched) {
+      TaskStatus status;
+      status.mutable_task_id()->MergeFrom(task.task_id());
+      status.set_state(TASK_FAILED);
+      status.set_message(
+          "Attempted to run multiple tasks using a \"command\" executor");
+
+      driver->sendStatusUpdate(status);
+      return;
+    }
+
+    // Capture the TaskID.
+    taskId = task.task_id();
+
+    // Capture the kill policy.
+    if (task.has_kill_policy()) {
+      killPolicy = task.kill_policy();
+    }
+
+    // Determine the command to launch the task.
+    CommandInfo command;
+
+    if (taskCommand.isSome()) {
+      // Get CommandInfo from a JSON string.
+      Try<JSON::Object> object = JSON::parse<JSON::Object>(taskCommand.get());
+      if (object.isError()) {
+        cerr << "Failed to parse JSON: " << object.error() << endl;
+        abort();
+      }
+
+      Try<CommandInfo> parse = protobuf::parse<CommandInfo>(object.get());
+      if (parse.isError()) {
+        cerr << "Failed to parse protobuf: " << parse.error() << endl;
+        abort();
+      }
+
+      command = parse.get();
+    } else if (task.has_command()) {
+      command = task.command();
+    } else {
+      CHECK_SOME(override)
+        << "Expecting task '" << task.task_id()
+        << "' to have a command!";
+    }
+
+    if (override.isNone()) {
+      // TODO(jieyu): For now, we just fail the executor if the task's
+      // CommandInfo is not valid. The framework will receive
+      // TASK_FAILED for the task, and will most likely find out the
+      // cause with some debugging. This is a temporary solution. A more
+      // correct solution is to perform this validation at master side.
+      if (command.shell()) {
+        CHECK(command.has_value())
+          << "Shell command of task '" << task.task_id()
+          << "' is not specified!";
+      } else {
+        CHECK(command.has_value())
+          << "Executable of task '" << task.task_id()
+          << "' is not specified!";
+      }
+    }
+
+    cout << "Starting task " << task.task_id() << endl;
+
+    // TODO(benh): Clean this up with the new 'Fork' abstraction.
+    // Use pipes to determine which child has successfully changed
+    // session. This is needed as the setsid call can fail from other
+    // processes having the same group id.
+    int pipes[2];
+    if (pipe(pipes) < 0) {
+      perror("Failed to create a pipe");
+      abort();
+    }
+
+    // Set the FD_CLOEXEC flags on these pipes.
+    Try<Nothing> cloexec = os::cloexec(pipes[0]);
+    if (cloexec.isError()) {
+      cerr << "Failed to cloexec(pipe[0]): " << cloexec.error() << endl;
+      abort();
+    }
+
+    cloexec = os::cloexec(pipes[1]);
+    if (cloexec.isError()) {
+      cerr << "Failed to cloexec(pipe[1]): " << cloexec.error() << endl;
+      abort();
+    }
+
+    Option<string> rootfs;
+    if (sandboxDirectory.isSome()) {
+      // If 'sandbox_diretory' is specified, that means the user
+      // task specifies a root filesystem, and that root filesystem has
+      // already been prepared at COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH.
+      // The command executor is responsible for mounting the sandbox
+      // into the root filesystem, chrooting into it and changing the
+      // user before exec-ing the user process.
+      //
+      // TODO(gilbert): Consider a better way to detect if a root
+      // filesystem is specified for the command task.
+#ifdef __linux__
+      Result<string> user = os::user();
+      if (user.isError()) {
+        cerr << "Failed to get current user: " << user.error() << endl;
+        abort();
+      } else if (user.isNone()) {
+        cerr << "Current username is not found" << endl;
+        abort();
+      } else if (user.get() != "root") {
+        cerr << "The command executor requires root with rootfs" << endl;
+        abort();
+      }
+
+      rootfs = path::join(
+          os::getcwd(), COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH);
+
+      string sandbox = path::join(rootfs.get(), sandboxDirectory.get());
+      if (!os::exists(sandbox)) {
+        Try<Nothing> mkdir = os::mkdir(sandbox);
+        if (mkdir.isError()) {
+          cerr << "Failed to create sandbox mount point  at '"
+               << sandbox << "': " << mkdir.error() << endl;
+          abort();
+        }
+      }
+
+      // Mount the sandbox into the container rootfs.
+      // We need to perform a recursive mount because we want all the
+      // volume mounts in the sandbox to be also mounted in the container
+      // root filesystem. However, since the container root filesystem
+      // is also mounted in the sandbox, after the recursive mount we
+      // also need to unmount the root filesystem in the mounted sandbox.
+      Try<Nothing> mount = fs::mount(
+          os::getcwd(),
+          sandbox,
+          None(),
+          MS_BIND | MS_REC,
+          NULL);
+
+      if (mount.isError()) {
+        cerr << "Unable to mount the work directory into container "
+             << "rootfs: " << mount.error() << endl;;
+        abort();
+      }
+
+      // Umount the root filesystem path in the mounted sandbox after
+      // the recursive mount.
+      Try<Nothing> unmountAll = fs::unmountAll(path::join(
+          sandbox,
+          COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH));
+      if (unmountAll.isError()) {
+        cerr << "Unable to unmount rootfs under mounted sandbox: "
+             << unmountAll.error() << endl;
+        abort();
+      }
+#else
+      cerr << "Not expecting root volume with non-linux platform." << endl;
+      abort();
+#endif // __linux__
+    }
+
+    // Prepare the argv before fork as it's not async signal safe.
+    char **argv = new char*[command.arguments().size() + 1];
+    for (int i = 0; i < command.arguments().size(); i++) {
+      argv[i] = (char*) command.arguments(i).c_str();
+    }
+    argv[command.arguments().size()] = NULL;
+
+    // Prepare the command log message.
+    string commandString;
+    if (override.isSome()) {
+      char** argv = override.get();
+      // argv is guaranteed to be NULL terminated and we rely on
+      // that fact to print command to be executed.
+      for (int i = 0; argv[i] != NULL; i++) {
+        commandString += string(argv[i]) + " ";
+      }
+    } else if (command.shell()) {
+      commandString = "sh -c '" + command.value() + "'";
+    } else {
+      commandString =
+        "[" + command.value() + ", " +
+        strings::join(", ", command.arguments()) + "]";
+    }
+
+    if ((pid = fork()) == -1) {
+      cerr << "Failed to fork to run " << commandString << ": "
+           << os::strerror(errno) << endl;
+      abort();
+    }
+
+    // TODO(jieyu): Make the child process async signal safe.
+    if (pid == 0) {
+      // In child process, we make cleanup easier by putting process
+      // into it's own session.
+      os::close(pipes[0]);
+
+      // NOTE: We setsid() in a loop because setsid() might fail if another
+      // process has the same process group id as the calling process.
+      while ((pid = setsid()) == -1) {
+        perror("Could not put command in its own session, setsid");
+
+        cout << "Forking another process and retrying" << endl;
+
+        if ((pid = fork()) == -1) {
+          perror("Failed to fork to launch command");
+          abort();
+        }
+
+        if (pid > 0) {
+          // In parent process. It is ok to suicide here, because
+          // we're not watching this process.
+          exit(0);
+        }
+      }
+
+      if (write(pipes[1], &pid, sizeof(pid)) != sizeof(pid)) {
+        perror("Failed to write PID on pipe");
+        abort();
+      }
+
+      os::close(pipes[1]);
+
+      if (rootfs.isSome()) {
+#ifdef __linux__
+        if (user.isSome()) {
+          // This is a work around to fix the problem that after we chroot
+          // os::su call afterwards failed because the linker may not be
+          // able to find the necessary library in the rootfs.
+          // We call os::su before chroot here to force the linker to load
+          // into memory.
+          // We also assume it's safe to su to "root" user since
+          // filesystem/linux.cpp checks for root already.
+          os::su("root");
+        }
+
+        Try<Nothing> chroot = fs::chroot::enter(rootfs.get());
+        if (chroot.isError()) {
+          cerr << "Failed to enter chroot '" << rootfs.get()
+               << "': " << chroot.error() << endl;;
+          abort();
+        }
+
+        // Determine the current working directory for the executor.
+        string cwd;
+        if (workingDirectory.isSome()) {
+          cwd = workingDirectory.get();
+        } else {
+          CHECK_SOME(sandboxDirectory);
+          cwd = sandboxDirectory.get();
+        }
+
+        Try<Nothing> chdir = os::chdir(cwd);
+        if (chdir.isError()) {
+          cerr << "Failed to chdir into current working directory '"
+               << cwd << "': " << chdir.error() << endl;
+          abort();
+        }
+
+        if (user.isSome()) {
+          Try<Nothing> su = os::su(user.get());
+          if (su.isError()) {
+            cerr << "Failed to change user to '" << user.get() << "': "
+                 << su.error() << endl;
+            abort();
+          }
+        }
+#else
+        cerr << "Rootfs is only supported on Linux" << endl;
+        abort();
+#endif // __linux__
+      }
+
+      cout << commandString << endl;
+
+      // The child has successfully setsid, now run the command.
+      if (override.isNone()) {
+        if (command.shell()) {
+          execlp(
+              "sh",
+              "sh",
+              "-c",
+              command.value().c_str(),
+              (char*) NULL);
+        } else {
+          execvp(command.value().c_str(), argv);
+        }
+      } else {
+        char** argv = override.get();
+        execvp(argv[0], argv);
+      }
+
+      perror("Failed to exec");
+      abort();
+    }
+
+    delete[] argv;
+
+    // In parent process.
+    os::close(pipes[1]);
+
+    // Get the child's pid via the pipe.
+    if (read(pipes[0], &pid, sizeof(pid)) == -1) {
+      cerr << "Failed to get child PID from pipe, read: "
+           << os::strerror(errno) << endl;
+      abort();
+    }
+
+    os::close(pipes[0]);
+
+    cout << "Forked command at " << pid << endl;
+
+    if (task.has_health_check()) {
+      launchHealthCheck(task);
+    }
+
+    // Monitor this process.
+    process::reap(pid)
+      .onAny(defer(self(), &Self::reaped, driver, pid, lambda::_1));
+
+    TaskStatus status;
+    status.mutable_task_id()->MergeFrom(task.task_id());
+    status.set_state(TASK_RUNNING);
+    driver->sendStatusUpdate(status);
+
+    launched = true;
+  }
+
+  void killTask(ExecutorDriver* driver, const TaskID& taskId)
+  {
+    cout << "Received killTask for task " << taskId.value() << endl;
+
+    // Default grace period is set to 3s for backwards compatibility.
+    //
+    // TODO(alexr): Replace it with a more meaningful default, e.g.
+    // `shutdownGracePeriod` after the deprecation cycle, started in 0.29.
+    Duration gracePeriod = Seconds(3);
+
+    if (killPolicy.isSome() && killPolicy->has_grace_period()) {
+      gracePeriod = Nanoseconds(killPolicy->grace_period().nanoseconds());
+    }
+
+    killTask(driver, taskId, gracePeriod);
+  }
+
+  void frameworkMessage(ExecutorDriver* driver, const string& data) {}
+
+  void shutdown(ExecutorDriver* driver)
+  {
+    cout << "Shutting down" << endl;
+
+    // NOTE: We leave a small buffer of time to do the forced kill, otherwise
+    // the agent may destroy the container before we can send `TASK_KILLED`.
+    //
+    // TODO(alexr): Remove `MAX_REAP_INTERVAL` once the reaper signals
+    // immediately after the watched process has exited.
+    Duration gracePeriod =
+      shutdownGracePeriod - process::MAX_REAP_INTERVAL() - Seconds(1);
+
+    // Since the command executor manages a single task,
+    // shutdown boils down to killing this task.
+    //
+    // TODO(bmahler): If a shutdown arrives after a kill task within
+    // the grace period of the `KillPolicy`, we may need to escalate
+    // more quickly (e.g. the shutdown grace period allotted by the
+    // agent is smaller than the kill grace period).
+    if (launched) {
+      CHECK_SOME(taskId);
+      killTask(driver, taskId.get(), gracePeriod);
+    } else {
+      driver->stop();
+    }
+  }
+
+  virtual void error(ExecutorDriver* driver, const string& message) {}
+
+protected:
+  virtual void initialize()
+  {
+    install<TaskHealthStatus>(
+        &CommandExecutorProcess::taskHealthUpdated,
+        &TaskHealthStatus::task_id,
+        &TaskHealthStatus::healthy,
+        &TaskHealthStatus::kill_task);
+  }
+
+  void taskHealthUpdated(
+      const TaskID& taskID,
+      const bool& healthy,
+      const bool& initiateTaskKill)
+  {
+    if (driver.isNone()) {
+      return;
+    }
+
+    cout << "Received task health update, healthy: "
+         << stringify(healthy) << endl;
+
+    TaskStatus status;
+    status.mutable_task_id()->CopyFrom(taskID);
+    status.set_healthy(healthy);
+    status.set_state(TASK_RUNNING);
+    driver.get()->sendStatusUpdate(status);
+
+    if (initiateTaskKill) {
+      killedByHealthCheck = true;
+      killTask(driver.get(), taskID);
+    }
+  }
+
+private:
+  void killTask(
+      ExecutorDriver* driver,
+      const TaskID& _taskId,
+      const Duration& gracePeriod)
+  {
+    if (launched && !killed) {
+      // Send TASK_KILLING if the framework can handle it.
+      CHECK_SOME(frameworkInfo);
+      CHECK_SOME(taskId);
+      CHECK(taskId.get() == _taskId);
+
+      foreach (const FrameworkInfo::Capability& c,
+               frameworkInfo->capabilities()) {
+        if (c.type() == FrameworkInfo::Capability::TASK_KILLING_STATE) {
+          TaskStatus status;
+          status.mutable_task_id()->CopyFrom(taskId.get());
+          status.set_state(TASK_KILLING);
+          driver->sendStatusUpdate(status);
+          break;
+        }
+      }
+
+      // Now perform signal escalation to begin killing the task.
+      CHECK_GT(pid, 0);
+
+      cout << "Sending SIGTERM to process tree at pid " << pid << endl;
+
+      Try<std::list<os::ProcessTree> > trees =
+        os::killtree(pid, SIGTERM, true, true);
+
+      if (trees.isError()) {
+        cerr << "Failed to kill the process tree rooted at pid " << pid
+             << ": " << trees.error() << endl;
+
+        // Send SIGTERM directly to process 'pid' as it may not have
+        // received signal before os::killtree() failed.
+        ::kill(pid, SIGTERM);
+      } else {
+        cout << "Sent SIGTERM to the following process trees:\n"
+             << stringify(trees.get()) << endl;
+      }
+
+      escalationTimer =
+        delay(gracePeriod, self(), &Self::escalated, gracePeriod);
+
+      killed = true;
+    }
+
+    // Cleanup health check process.
+    //
+    // TODO(bmahler): Consider doing this after the task has been
+    // reaped, since a framework may be interested in health
+    // information while the task is being killed (consider a
+    // task that takes 30 minutes to be cleanly killed).
+    if (healthPid != -1) {
+      os::killtree(healthPid, SIGKILL);
+    }
+  }
+
+  void reaped(
+      ExecutorDriver* driver,
+      pid_t pid,
+      const Future<Option<int> >& status_)
+  {
+    TaskState taskState;
+    string message;
+
+    Clock::cancel(escalationTimer);
+
+    if (!status_.isReady()) {
+      taskState = TASK_FAILED;
+      message =
+        "Failed to get exit status for Command: " +
+        (status_.isFailed() ? status_.failure() : "future discarded");
+    } else if (status_.get().isNone()) {
+      taskState = TASK_FAILED;
+      message = "Failed to get exit status for Command";
+    } else {
+      int status = status_.get().get();
+      CHECK(WIFEXITED(status) || WIFSIGNALED(status)) << status;
+
+      if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
+        taskState = TASK_FINISHED;
+      } else if (killed) {
+        // Send TASK_KILLED if the task was killed as a result of
+        // killTask() or shutdown().
+        taskState = TASK_KILLED;
+      } else {
+        taskState = TASK_FAILED;
+      }
+
+      message = "Command " + WSTRINGIFY(status);
+    }
+
+    cout << message << " (pid: " << pid << ")" << endl;
+
+    CHECK_SOME(taskId);
+
+    TaskStatus taskStatus;
+    taskStatus.mutable_task_id()->MergeFrom(taskId.get());
+    taskStatus.set_state(taskState);
+    taskStatus.set_message(message);
+    if (killed && killedByHealthCheck) {
+      taskStatus.set_healthy(false);
+    }
+
+    driver->sendStatusUpdate(taskStatus);
+
+    // This is a hack to ensure the message is sent to the
+    // slave before we exit the process. Without this, we
+    // may exit before libprocess has sent the data over
+    // the socket. See MESOS-4111.
+    os::sleep(Seconds(1));
+    driver->stop();
+  }
+
+  void escalated(Duration timeout)
+  {
+    cout << "Process " << pid << " did not terminate after " << timeout
+         << ", sending SIGKILL to process tree at " << pid << endl;
+
+    // TODO(nnielsen): Sending SIGTERM in the first stage of the
+    // shutdown may leave orphan processes hanging off init. This
+    // scenario will be handled when PID namespace encapsulated
+    // execution is in place.
+    Try<std::list<os::ProcessTree> > trees =
+      os::killtree(pid, SIGKILL, true, true);
+
+    if (trees.isError()) {
+      cerr << "Failed to kill the process tree rooted at pid "
+           << pid << ": " << trees.error() << endl;
+
+      // Process 'pid' may not have received signal before
+      // os::killtree() failed. To make sure process 'pid' is reaped
+      // we send SIGKILL directly.
+      ::kill(pid, SIGKILL);
+    } else {
+      cout << "Killed the following process trees:\n" << stringify(trees.get())
+           << endl;
+    }
+  }
+
+  void launchHealthCheck(const TaskInfo& task)
+  {
+    CHECK(task.has_health_check());
+
+    JSON::Object json = JSON::protobuf(task.health_check());
+
+    // Launch the subprocess using 'exec' style so that quotes can
+    // be properly handled.
+    vector<string> argv(4);
+    argv[0] = "mesos-health-check";
+    argv[1] = "--executor=" + stringify(self());
+    argv[2] = "--health_check_json=" + stringify(json);
+    argv[3] = "--task_id=" + task.task_id().value();
+
+    cout << "Launching health check process: "
+         << path::join(healthCheckDir, "mesos-health-check")
+         << " " << argv[1] << " " << argv[2] << " " << argv[3] << endl;
+
+    Try<Subprocess> healthProcess =
+      process::subprocess(
+        path::join(healthCheckDir, "mesos-health-check"),
+        argv,
+        // Intentionally not sending STDIN to avoid health check
+        // commands that expect STDIN input to block.
+        Subprocess::PATH("/dev/null"),
+        Subprocess::FD(STDOUT_FILENO),
+        Subprocess::FD(STDERR_FILENO));
+
+    if (healthProcess.isError()) {
+      cerr << "Unable to launch health process: " << healthProcess.error();
+      return;
+    }
+
+    healthPid = healthProcess.get().pid();
+
+    cout << "Health check process launched at pid: "
+         << stringify(healthPid) << endl;
+  }
+
+  enum State
+  {
+    REGISTERING, // Executor is launched but not (re-)registered yet.
+    REGISTERED,  // Executor has (re-)registered.
+  } state;
+
+  bool launched;
+  bool killed;
+  bool killedByHealthCheck;
+  pid_t pid;
+  pid_t healthPid;
+  Duration shutdownGracePeriod;
+  Option<KillPolicy> killPolicy;
+  Timer escalationTimer;
+  Option<ExecutorDriver*> driver;
+  Option<FrameworkInfo> frameworkInfo;
+  Option<TaskID> taskId;
+  string healthCheckDir;
+  Option<char**> override;
+  Option<string> sandboxDirectory;
+  Option<string> workingDirectory;
+  Option<string> user;
+  Option<string> taskCommand;
+};
+
+
+class CommandExecutor: public Executor
+{
+public:
+  CommandExecutor(
+      const Option<char**>& override,
+      const string& healthCheckDir,
+      const Option<string>& sandboxDirectory,
+      const Option<string>& workingDirectory,
+      const Option<string>& user,
+      const Option<string>& taskCommand,
+      const Duration& shutdownGracePeriod)
+  {
+    process = new CommandExecutorProcess(
+        override,
+        healthCheckDir,
+        sandboxDirectory,
+        workingDirectory,
+        user,
+        taskCommand,
+        shutdownGracePeriod);
+
+    spawn(process);
+  }
+
+  virtual ~CommandExecutor()
+  {
+    terminate(process);
+    wait(process);
+    delete process;
+  }
+
+  virtual void registered(
+        ExecutorDriver* driver,
+        const ExecutorInfo& executorInfo,
+        const FrameworkInfo& frameworkInfo,
+        const SlaveInfo& slaveInfo)
+  {
+    dispatch(process,
+             &CommandExecutorProcess::registered,
+             driver,
+             executorInfo,
+             frameworkInfo,
+             slaveInfo);
+  }
+
+  virtual void reregistered(
+      ExecutorDriver* driver,
+      const SlaveInfo& slaveInfo)
+  {
+    dispatch(process,
+             &CommandExecutorProcess::reregistered,
+             driver,
+             slaveInfo);
+  }
+
+  virtual void disconnected(ExecutorDriver* driver)
+  {
+    dispatch(process, &CommandExecutorProcess::disconnected, driver);
+  }
+
+  virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+  {
+    dispatch(process, &CommandExecutorProcess::launchTask, driver, task);
+  }
+
+  virtual void killTask(ExecutorDriver* driver, const TaskID& taskId)
+  {
+    dispatch(process, &CommandExecutorProcess::killTask, driver, taskId);
+  }
+
+  virtual void frameworkMessage(ExecutorDriver* driver, const string& data)
+  {
+    dispatch(process, &CommandExecutorProcess::frameworkMessage, driver, data);
+  }
+
+  virtual void shutdown(ExecutorDriver* driver)
+  {
+    dispatch(process, &CommandExecutorProcess::shutdown, driver);
+  }
+
+  virtual void error(ExecutorDriver* driver, const string& data)
+  {
+    dispatch(process, &CommandExecutorProcess::error, driver, data);
+  }
+
+private:
+  CommandExecutorProcess* process;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+
+class Flags : public flags::FlagsBase
+{
+public:
+  Flags()
+  {
+    // TODO(gilbert): Deprecate the 'override' flag since no one is
+    // using it, and it may cause confusing with 'task_command' flag.
+    add(&override,
+        "override",
+        "Whether to override the command the executor should run when the\n"
+        "task is launched. Only this flag is expected to be on the command\n"
+        "line and all arguments after the flag will be used as the\n"
+        "subsequent 'argv' to be used with 'execvp'",
+        false);
+
+    // The following flags are only applicable when a rootfs is
+    // provisioned for this command.
+    add(&sandbox_directory,
+        "sandbox_directory",
+        "The absolute path for the directory in the container where the\n"
+        "sandbox is mapped to");
+
+    add(&working_directory,
+        "working_directory",
+        "The working directory for the task in the container.");
+
+    add(&user,
+        "user",
+        "The user that the task should be running as.");
+
+    add(&task_command,
+        "task_command",
+        "If specified, this is the overrided command for launching the\n"
+        "task (instead of the command from TaskInfo).");
+
+    // TODO(nnielsen): Add 'prefix' option to enable replacing
+    // 'sh -c' with user specified wrapper.
+  }
+
+  bool override;
+  Option<string> sandbox_directory;
+  Option<string> working_directory;
+  Option<string> user;
+  Option<string> task_command;
+};
+
+
+int main(int argc, char** argv)
+{
+  Flags flags;
+
+  // Load flags from command line.
+  Try<Nothing> load = flags.load(None(), &argc, &argv);
+
+  if (load.isError()) {
+    cerr << flags.usage(load.error()) << endl;
+    return EXIT_FAILURE;
+  }
+
+  if (flags.help) {
+    cout << flags.usage() << endl;
+    return EXIT_SUCCESS;
+  }
+
+  // After flags.load(..., &argc, &argv) all flags will have been
+  // stripped from argv. Additionally, arguments after a "--"
+  // terminator will be preservered in argv and it is therefore
+  // possible to pass override and prefix commands which use
+  // "--foobar" style flags.
+  Option<char**> override = None();
+  if (flags.override) {
+    if (argc > 1) {
+      override = argv + 1;
+    }
+  }
+
+  const Option<string> envPath = os::getenv("MESOS_LAUNCHER_DIR");
+
+  string path = envPath.isSome()
+    ? envPath.get()
+    : os::realpath(Path(argv[0]).dirname()).get();
+
+  // Get executor shutdown grace period from the environment.
+  //
+  // NOTE: We avoided introducing a command executor flag for this
+  // because the command executor exits if it sees an unknown flag.
+  // This makes it difficult to add or remove command executor flags
+  // that are unconditionally set by the agent.
+  Duration shutdownGracePeriod = DEFAULT_EXECUTOR_SHUTDOWN_GRACE_PERIOD;
+  Option<string> value = os::getenv("MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD");
+  if (value.isSome()) {
+    Try<Duration> parse = Duration::parse(value.get());
+    if (parse.isError()) {
+      cerr << "Failed to parse value '" << value.get() << "'"
+           << " of 'MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD': " << parse.error();
+      return EXIT_FAILURE;
+    }
+
+    shutdownGracePeriod = parse.get();
+  }
+
+  mesos::internal::CommandExecutor executor(
+      override,
+      path,
+      flags.sandbox_directory,
+      flags.working_directory,
+      flags.user,
+      flags.task_command,
+      shutdownGracePeriod);
+
+  mesos::MesosExecutorDriver driver(&executor);
+
+  return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
+}