You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/04/14 01:27:51 UTC

[4/4] mesos git commit: Added HTTP command executor to make files.

Added HTTP command executor to make files.

Added HTTP command executor to make files. For now the content in
http_command_executor.cpp is identical to executor.cpp, and it
will be updated in the subsequent review.

Review: https://reviews.apache.org/r/44423/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ed304030
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ed304030
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ed304030

Branch: refs/heads/master
Commit: ed304030fc5a36d1c1f13e505d6b56f928a81cd4
Parents: e492b54
Author: Qian Zhang <zh...@cn.ibm.com>
Authored: Wed Apr 13 15:55:21 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Apr 13 16:27:39 2016 -0700

----------------------------------------------------------------------
 src/Makefile.am                        |   5 +
 src/launcher/http_command_executor.cpp | 963 ++++++++++++++++++++++++++++
 2 files changed, 968 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ed304030/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index a8f6831..139935f 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1181,6 +1181,11 @@ mesos_executor_SOURCES = launcher/executor.cpp
 mesos_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
 mesos_executor_LDADD = libmesos.la $(LDADD)
 
+pkglibexec_PROGRAMS += mesos-http-executor
+mesos_http_executor_SOURCES = launcher/http_command_executor.cpp
+mesos_http_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
+mesos_http_executor_LDADD = libmesos.la $(LDADD)
+
 pkglibexec_PROGRAMS += mesos-containerizer
 mesos_containerizer_SOURCES = slave/containerizer/mesos/main.cpp
 mesos_containerizer_CPPFLAGS = $(MESOS_CPPFLAGS)

http://git-wip-us.apache.org/repos/asf/mesos/blob/ed304030/src/launcher/http_command_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/http_command_executor.cpp b/src/launcher/http_command_executor.cpp
new file mode 100644
index 0000000..7677391
--- /dev/null
+++ b/src/launcher/http_command_executor.cpp
@@ -0,0 +1,963 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <signal.h>
+#include <stdio.h>
+
+#include <sys/wait.h>
+
+#include <iostream>
+#include <list>
+#include <string>
+#include <vector>
+
+#include <mesos/executor.hpp>
+#include <mesos/type_utils.hpp>
+
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/future.hpp>
+#include <process/io.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+#include <process/subprocess.hpp>
+#include <process/reap.hpp>
+#include <process/timer.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/flags.hpp>
+#include <stout/json.hpp>
+#include <stout/lambda.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/strings.hpp>
+
+#include "common/http.hpp"
+#include "common/status_utils.hpp"
+
+#ifdef __linux__
+#include "linux/fs.hpp"
+#endif
+
+#include "logging/logging.hpp"
+
+#include "messages/messages.hpp"
+
+#include "slave/constants.hpp"
+
+using namespace mesos::internal::slave;
+
+using process::wait; // Necessary on some OS's to disambiguate.
+
+using std::cout;
+using std::cerr;
+using std::endl;
+using std::string;
+using std::vector;
+
+namespace mesos {
+namespace internal {
+
+using namespace process;
+
+class CommandExecutorProcess : public ProtobufProcess<CommandExecutorProcess>
+{
+public:
+  CommandExecutorProcess(
+      const Option<char**>& override,
+      const string& _healthCheckDir,
+      const Option<string>& _sandboxDirectory,
+      const Option<string>& _workingDirectory,
+      const Option<string>& _user,
+      const Option<string>& _taskCommand,
+      const Duration& _shutdownGracePeriod)
+    : state(REGISTERING),
+      launched(false),
+      killed(false),
+      killedByHealthCheck(false),
+      pid(-1),
+      healthPid(-1),
+      shutdownGracePeriod(_shutdownGracePeriod),
+      driver(None()),
+      frameworkInfo(None()),
+      taskId(None()),
+      healthCheckDir(_healthCheckDir),
+      override(override),
+      sandboxDirectory(_sandboxDirectory),
+      workingDirectory(_workingDirectory),
+      user(_user),
+      taskCommand(_taskCommand) {}
+
+  virtual ~CommandExecutorProcess() {}
+
+  void registered(
+      ExecutorDriver* _driver,
+      const ExecutorInfo& _executorInfo,
+      const FrameworkInfo& _frameworkInfo,
+      const SlaveInfo& slaveInfo)
+  {
+    CHECK_EQ(REGISTERING, state);
+
+    cout << "Registered executor on " << slaveInfo.hostname() << endl;
+
+    driver = _driver;
+    frameworkInfo = _frameworkInfo;
+
+    state = REGISTERED;
+  }
+
+  void reregistered(
+      ExecutorDriver* driver,
+      const SlaveInfo& slaveInfo)
+  {
+    CHECK(state == REGISTERED || state == REGISTERING) << state;
+
+    cout << "Re-registered executor on " << slaveInfo.hostname() << endl;
+
+    state = REGISTERED;
+  }
+
+  void disconnected(ExecutorDriver* driver) {}
+
+  void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+  {
+    CHECK_EQ(REGISTERED, state);
+
+    if (launched) {
+      TaskStatus status;
+      status.mutable_task_id()->MergeFrom(task.task_id());
+      status.set_state(TASK_FAILED);
+      status.set_message(
+          "Attempted to run multiple tasks using a \"command\" executor");
+
+      driver->sendStatusUpdate(status);
+      return;
+    }
+
+    // Capture the TaskID.
+    taskId = task.task_id();
+
+    // Capture the kill policy.
+    if (task.has_kill_policy()) {
+      killPolicy = task.kill_policy();
+    }
+
+    // Determine the command to launch the task.
+    CommandInfo command;
+
+    if (taskCommand.isSome()) {
+      // Get CommandInfo from a JSON string.
+      Try<JSON::Object> object = JSON::parse<JSON::Object>(taskCommand.get());
+      if (object.isError()) {
+        cerr << "Failed to parse JSON: " << object.error() << endl;
+        abort();
+      }
+
+      Try<CommandInfo> parse = protobuf::parse<CommandInfo>(object.get());
+      if (parse.isError()) {
+        cerr << "Failed to parse protobuf: " << parse.error() << endl;
+        abort();
+      }
+
+      command = parse.get();
+    } else if (task.has_command()) {
+      command = task.command();
+    } else {
+      CHECK_SOME(override)
+        << "Expecting task '" << task.task_id()
+        << "' to have a command!";
+    }
+
+    if (override.isNone()) {
+      // TODO(jieyu): For now, we just fail the executor if the task's
+      // CommandInfo is not valid. The framework will receive
+      // TASK_FAILED for the task, and will most likely find out the
+      // cause with some debugging. This is a temporary solution. A more
+      // correct solution is to perform this validation at master side.
+      if (command.shell()) {
+        CHECK(command.has_value())
+          << "Shell command of task '" << task.task_id()
+          << "' is not specified!";
+      } else {
+        CHECK(command.has_value())
+          << "Executable of task '" << task.task_id()
+          << "' is not specified!";
+      }
+    }
+
+    cout << "Starting task " << task.task_id() << endl;
+
+    // TODO(benh): Clean this up with the new 'Fork' abstraction.
+    // Use pipes to determine which child has successfully changed
+    // session. This is needed as the setsid call can fail from other
+    // processes having the same group id.
+    int pipes[2];
+    if (pipe(pipes) < 0) {
+      perror("Failed to create a pipe");
+      abort();
+    }
+
+    // Set the FD_CLOEXEC flags on these pipes.
+    Try<Nothing> cloexec = os::cloexec(pipes[0]);
+    if (cloexec.isError()) {
+      cerr << "Failed to cloexec(pipe[0]): " << cloexec.error() << endl;
+      abort();
+    }
+
+    cloexec = os::cloexec(pipes[1]);
+    if (cloexec.isError()) {
+      cerr << "Failed to cloexec(pipe[1]): " << cloexec.error() << endl;
+      abort();
+    }
+
+    Option<string> rootfs;
+    if (sandboxDirectory.isSome()) {
+      // If 'sandbox_diretory' is specified, that means the user
+      // task specifies a root filesystem, and that root filesystem has
+      // already been prepared at COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH.
+      // The command executor is responsible for mounting the sandbox
+      // into the root filesystem, chrooting into it and changing the
+      // user before exec-ing the user process.
+      //
+      // TODO(gilbert): Consider a better way to detect if a root
+      // filesystem is specified for the command task.
+#ifdef __linux__
+      Result<string> user = os::user();
+      if (user.isError()) {
+        cerr << "Failed to get current user: " << user.error() << endl;
+        abort();
+      } else if (user.isNone()) {
+        cerr << "Current username is not found" << endl;
+        abort();
+      } else if (user.get() != "root") {
+        cerr << "The command executor requires root with rootfs" << endl;
+        abort();
+      }
+
+      rootfs = path::join(
+          os::getcwd(), COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH);
+
+      string sandbox = path::join(rootfs.get(), sandboxDirectory.get());
+      if (!os::exists(sandbox)) {
+        Try<Nothing> mkdir = os::mkdir(sandbox);
+        if (mkdir.isError()) {
+          cerr << "Failed to create sandbox mount point  at '"
+               << sandbox << "': " << mkdir.error() << endl;
+          abort();
+        }
+      }
+
+      // Mount the sandbox into the container rootfs.
+      // We need to perform a recursive mount because we want all the
+      // volume mounts in the sandbox to be also mounted in the container
+      // root filesystem. However, since the container root filesystem
+      // is also mounted in the sandbox, after the recursive mount we
+      // also need to unmount the root filesystem in the mounted sandbox.
+      Try<Nothing> mount = fs::mount(
+          os::getcwd(),
+          sandbox,
+          None(),
+          MS_BIND | MS_REC,
+          NULL);
+
+      if (mount.isError()) {
+        cerr << "Unable to mount the work directory into container "
+             << "rootfs: " << mount.error() << endl;;
+        abort();
+      }
+
+      // Umount the root filesystem path in the mounted sandbox after
+      // the recursive mount.
+      Try<Nothing> unmountAll = fs::unmountAll(path::join(
+          sandbox,
+          COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH));
+      if (unmountAll.isError()) {
+        cerr << "Unable to unmount rootfs under mounted sandbox: "
+             << unmountAll.error() << endl;
+        abort();
+      }
+#else
+      cerr << "Not expecting root volume with non-linux platform." << endl;
+      abort();
+#endif // __linux__
+    }
+
+    // Prepare the argv before fork as it's not async signal safe.
+    char **argv = new char*[command.arguments().size() + 1];
+    for (int i = 0; i < command.arguments().size(); i++) {
+      argv[i] = (char*) command.arguments(i).c_str();
+    }
+    argv[command.arguments().size()] = NULL;
+
+    // Prepare the command log message.
+    string commandString;
+    if (override.isSome()) {
+      char** argv = override.get();
+      // argv is guaranteed to be NULL terminated and we rely on
+      // that fact to print command to be executed.
+      for (int i = 0; argv[i] != NULL; i++) {
+        commandString += string(argv[i]) + " ";
+      }
+    } else if (command.shell()) {
+      commandString = "sh -c '" + command.value() + "'";
+    } else {
+      commandString =
+        "[" + command.value() + ", " +
+        strings::join(", ", command.arguments()) + "]";
+    }
+
+    if ((pid = fork()) == -1) {
+      cerr << "Failed to fork to run " << commandString << ": "
+           << os::strerror(errno) << endl;
+      abort();
+    }
+
+    // TODO(jieyu): Make the child process async signal safe.
+    if (pid == 0) {
+      // In child process, we make cleanup easier by putting process
+      // into it's own session.
+      os::close(pipes[0]);
+
+      // NOTE: We setsid() in a loop because setsid() might fail if another
+      // process has the same process group id as the calling process.
+      while ((pid = setsid()) == -1) {
+        perror("Could not put command in its own session, setsid");
+
+        cout << "Forking another process and retrying" << endl;
+
+        if ((pid = fork()) == -1) {
+          perror("Failed to fork to launch command");
+          abort();
+        }
+
+        if (pid > 0) {
+          // In parent process. It is ok to suicide here, because
+          // we're not watching this process.
+          exit(0);
+        }
+      }
+
+      if (write(pipes[1], &pid, sizeof(pid)) != sizeof(pid)) {
+        perror("Failed to write PID on pipe");
+        abort();
+      }
+
+      os::close(pipes[1]);
+
+      if (rootfs.isSome()) {
+#ifdef __linux__
+        if (user.isSome()) {
+          // This is a work around to fix the problem that after we chroot
+          // os::su call afterwards failed because the linker may not be
+          // able to find the necessary library in the rootfs.
+          // We call os::su before chroot here to force the linker to load
+          // into memory.
+          // We also assume it's safe to su to "root" user since
+          // filesystem/linux.cpp checks for root already.
+          os::su("root");
+        }
+
+        Try<Nothing> chroot = fs::chroot::enter(rootfs.get());
+        if (chroot.isError()) {
+          cerr << "Failed to enter chroot '" << rootfs.get()
+               << "': " << chroot.error() << endl;;
+          abort();
+        }
+
+        // Determine the current working directory for the executor.
+        string cwd;
+        if (workingDirectory.isSome()) {
+          cwd = workingDirectory.get();
+        } else {
+          CHECK_SOME(sandboxDirectory);
+          cwd = sandboxDirectory.get();
+        }
+
+        Try<Nothing> chdir = os::chdir(cwd);
+        if (chdir.isError()) {
+          cerr << "Failed to chdir into current working directory '"
+               << cwd << "': " << chdir.error() << endl;
+          abort();
+        }
+
+        if (user.isSome()) {
+          Try<Nothing> su = os::su(user.get());
+          if (su.isError()) {
+            cerr << "Failed to change user to '" << user.get() << "': "
+                 << su.error() << endl;
+            abort();
+          }
+        }
+#else
+        cerr << "Rootfs is only supported on Linux" << endl;
+        abort();
+#endif // __linux__
+      }
+
+      cout << commandString << endl;
+
+      // The child has successfully setsid, now run the command.
+      if (override.isNone()) {
+        if (command.shell()) {
+          execlp(
+              "sh",
+              "sh",
+              "-c",
+              command.value().c_str(),
+              (char*) NULL);
+        } else {
+          execvp(command.value().c_str(), argv);
+        }
+      } else {
+        char** argv = override.get();
+        execvp(argv[0], argv);
+      }
+
+      perror("Failed to exec");
+      abort();
+    }
+
+    delete[] argv;
+
+    // In parent process.
+    os::close(pipes[1]);
+
+    // Get the child's pid via the pipe.
+    if (read(pipes[0], &pid, sizeof(pid)) == -1) {
+      cerr << "Failed to get child PID from pipe, read: "
+           << os::strerror(errno) << endl;
+      abort();
+    }
+
+    os::close(pipes[0]);
+
+    cout << "Forked command at " << pid << endl;
+
+    if (task.has_health_check()) {
+      launchHealthCheck(task);
+    }
+
+    // Monitor this process.
+    process::reap(pid)
+      .onAny(defer(self(), &Self::reaped, driver, pid, lambda::_1));
+
+    TaskStatus status;
+    status.mutable_task_id()->MergeFrom(task.task_id());
+    status.set_state(TASK_RUNNING);
+    driver->sendStatusUpdate(status);
+
+    launched = true;
+  }
+
+  void killTask(ExecutorDriver* driver, const TaskID& taskId)
+  {
+    cout << "Received killTask for task " << taskId.value() << endl;
+
+    // Default grace period is set to 3s for backwards compatibility.
+    //
+    // TODO(alexr): Replace it with a more meaningful default, e.g.
+    // `shutdownGracePeriod` after the deprecation cycle, started in 0.29.
+    Duration gracePeriod = Seconds(3);
+
+    if (killPolicy.isSome() && killPolicy->has_grace_period()) {
+      gracePeriod = Nanoseconds(killPolicy->grace_period().nanoseconds());
+    }
+
+    killTask(driver, taskId, gracePeriod);
+  }
+
+  void frameworkMessage(ExecutorDriver* driver, const string& data) {}
+
+  void shutdown(ExecutorDriver* driver)
+  {
+    cout << "Shutting down" << endl;
+
+    // NOTE: We leave a small buffer of time to do the forced kill, otherwise
+    // the agent may destroy the container before we can send `TASK_KILLED`.
+    //
+    // TODO(alexr): Remove `MAX_REAP_INTERVAL` once the reaper signals
+    // immediately after the watched process has exited.
+    Duration gracePeriod =
+      shutdownGracePeriod - process::MAX_REAP_INTERVAL() - Seconds(1);
+
+    // Since the command executor manages a single task,
+    // shutdown boils down to killing this task.
+    //
+    // TODO(bmahler): If a shutdown arrives after a kill task within
+    // the grace period of the `KillPolicy`, we may need to escalate
+    // more quickly (e.g. the shutdown grace period allotted by the
+    // agent is smaller than the kill grace period).
+    if (launched) {
+      CHECK_SOME(taskId);
+      killTask(driver, taskId.get(), gracePeriod);
+    } else {
+      driver->stop();
+    }
+  }
+
+  virtual void error(ExecutorDriver* driver, const string& message) {}
+
+protected:
+  virtual void initialize()
+  {
+    install<TaskHealthStatus>(
+        &CommandExecutorProcess::taskHealthUpdated,
+        &TaskHealthStatus::task_id,
+        &TaskHealthStatus::healthy,
+        &TaskHealthStatus::kill_task);
+  }
+
+  void taskHealthUpdated(
+      const TaskID& taskID,
+      const bool& healthy,
+      const bool& initiateTaskKill)
+  {
+    if (driver.isNone()) {
+      return;
+    }
+
+    cout << "Received task health update, healthy: "
+         << stringify(healthy) << endl;
+
+    TaskStatus status;
+    status.mutable_task_id()->CopyFrom(taskID);
+    status.set_healthy(healthy);
+    status.set_state(TASK_RUNNING);
+    driver.get()->sendStatusUpdate(status);
+
+    if (initiateTaskKill) {
+      killedByHealthCheck = true;
+      killTask(driver.get(), taskID);
+    }
+  }
+
+private:
+  void killTask(
+      ExecutorDriver* driver,
+      const TaskID& _taskId,
+      const Duration& gracePeriod)
+  {
+    if (launched && !killed) {
+      // Send TASK_KILLING if the framework can handle it.
+      CHECK_SOME(frameworkInfo);
+      CHECK_SOME(taskId);
+      CHECK(taskId.get() == _taskId);
+
+      foreach (const FrameworkInfo::Capability& c,
+               frameworkInfo->capabilities()) {
+        if (c.type() == FrameworkInfo::Capability::TASK_KILLING_STATE) {
+          TaskStatus status;
+          status.mutable_task_id()->CopyFrom(taskId.get());
+          status.set_state(TASK_KILLING);
+          driver->sendStatusUpdate(status);
+          break;
+        }
+      }
+
+      // Now perform signal escalation to begin killing the task.
+      CHECK_GT(pid, 0);
+
+      cout << "Sending SIGTERM to process tree at pid " << pid << endl;
+
+      Try<std::list<os::ProcessTree> > trees =
+        os::killtree(pid, SIGTERM, true, true);
+
+      if (trees.isError()) {
+        cerr << "Failed to kill the process tree rooted at pid " << pid
+             << ": " << trees.error() << endl;
+
+        // Send SIGTERM directly to process 'pid' as it may not have
+        // received signal before os::killtree() failed.
+        ::kill(pid, SIGTERM);
+      } else {
+        cout << "Sent SIGTERM to the following process trees:\n"
+             << stringify(trees.get()) << endl;
+      }
+
+      escalationTimer =
+        delay(gracePeriod, self(), &Self::escalated, gracePeriod);
+
+      killed = true;
+    }
+
+    // Cleanup health check process.
+    //
+    // TODO(bmahler): Consider doing this after the task has been
+    // reaped, since a framework may be interested in health
+    // information while the task is being killed (consider a
+    // task that takes 30 minutes to be cleanly killed).
+    if (healthPid != -1) {
+      os::killtree(healthPid, SIGKILL);
+    }
+  }
+
+  void reaped(
+      ExecutorDriver* driver,
+      pid_t pid,
+      const Future<Option<int> >& status_)
+  {
+    TaskState taskState;
+    string message;
+
+    Clock::cancel(escalationTimer);
+
+    if (!status_.isReady()) {
+      taskState = TASK_FAILED;
+      message =
+        "Failed to get exit status for Command: " +
+        (status_.isFailed() ? status_.failure() : "future discarded");
+    } else if (status_.get().isNone()) {
+      taskState = TASK_FAILED;
+      message = "Failed to get exit status for Command";
+    } else {
+      int status = status_.get().get();
+      CHECK(WIFEXITED(status) || WIFSIGNALED(status)) << status;
+
+      if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
+        taskState = TASK_FINISHED;
+      } else if (killed) {
+        // Send TASK_KILLED if the task was killed as a result of
+        // killTask() or shutdown().
+        taskState = TASK_KILLED;
+      } else {
+        taskState = TASK_FAILED;
+      }
+
+      message = "Command " + WSTRINGIFY(status);
+    }
+
+    cout << message << " (pid: " << pid << ")" << endl;
+
+    CHECK_SOME(taskId);
+
+    TaskStatus taskStatus;
+    taskStatus.mutable_task_id()->MergeFrom(taskId.get());
+    taskStatus.set_state(taskState);
+    taskStatus.set_message(message);
+    if (killed && killedByHealthCheck) {
+      taskStatus.set_healthy(false);
+    }
+
+    driver->sendStatusUpdate(taskStatus);
+
+    // This is a hack to ensure the message is sent to the
+    // slave before we exit the process. Without this, we
+    // may exit before libprocess has sent the data over
+    // the socket. See MESOS-4111.
+    os::sleep(Seconds(1));
+    driver->stop();
+  }
+
+  void escalated(Duration timeout)
+  {
+    cout << "Process " << pid << " did not terminate after " << timeout
+         << ", sending SIGKILL to process tree at " << pid << endl;
+
+    // TODO(nnielsen): Sending SIGTERM in the first stage of the
+    // shutdown may leave orphan processes hanging off init. This
+    // scenario will be handled when PID namespace encapsulated
+    // execution is in place.
+    Try<std::list<os::ProcessTree> > trees =
+      os::killtree(pid, SIGKILL, true, true);
+
+    if (trees.isError()) {
+      cerr << "Failed to kill the process tree rooted at pid "
+           << pid << ": " << trees.error() << endl;
+
+      // Process 'pid' may not have received signal before
+      // os::killtree() failed. To make sure process 'pid' is reaped
+      // we send SIGKILL directly.
+      ::kill(pid, SIGKILL);
+    } else {
+      cout << "Killed the following process trees:\n" << stringify(trees.get())
+           << endl;
+    }
+  }
+
+  void launchHealthCheck(const TaskInfo& task)
+  {
+    CHECK(task.has_health_check());
+
+    JSON::Object json = JSON::protobuf(task.health_check());
+
+    // Launch the subprocess using 'exec' style so that quotes can
+    // be properly handled.
+    vector<string> argv(4);
+    argv[0] = "mesos-health-check";
+    argv[1] = "--executor=" + stringify(self());
+    argv[2] = "--health_check_json=" + stringify(json);
+    argv[3] = "--task_id=" + task.task_id().value();
+
+    cout << "Launching health check process: "
+         << path::join(healthCheckDir, "mesos-health-check")
+         << " " << argv[1] << " " << argv[2] << " " << argv[3] << endl;
+
+    Try<Subprocess> healthProcess =
+      process::subprocess(
+        path::join(healthCheckDir, "mesos-health-check"),
+        argv,
+        // Intentionally not sending STDIN to avoid health check
+        // commands that expect STDIN input to block.
+        Subprocess::PATH("/dev/null"),
+        Subprocess::FD(STDOUT_FILENO),
+        Subprocess::FD(STDERR_FILENO));
+
+    if (healthProcess.isError()) {
+      cerr << "Unable to launch health process: " << healthProcess.error();
+      return;
+    }
+
+    healthPid = healthProcess.get().pid();
+
+    cout << "Health check process launched at pid: "
+         << stringify(healthPid) << endl;
+  }
+
+  enum State
+  {
+    REGISTERING, // Executor is launched but not (re-)registered yet.
+    REGISTERED,  // Executor has (re-)registered.
+  } state;
+
+  bool launched;
+  bool killed;
+  bool killedByHealthCheck;
+  pid_t pid;
+  pid_t healthPid;
+  Duration shutdownGracePeriod;
+  Option<KillPolicy> killPolicy;
+  Timer escalationTimer;
+  Option<ExecutorDriver*> driver;
+  Option<FrameworkInfo> frameworkInfo;
+  Option<TaskID> taskId;
+  string healthCheckDir;
+  Option<char**> override;
+  Option<string> sandboxDirectory;
+  Option<string> workingDirectory;
+  Option<string> user;
+  Option<string> taskCommand;
+};
+
+
+class CommandExecutor: public Executor
+{
+public:
+  CommandExecutor(
+      const Option<char**>& override,
+      const string& healthCheckDir,
+      const Option<string>& sandboxDirectory,
+      const Option<string>& workingDirectory,
+      const Option<string>& user,
+      const Option<string>& taskCommand,
+      const Duration& shutdownGracePeriod)
+  {
+    process = new CommandExecutorProcess(
+        override,
+        healthCheckDir,
+        sandboxDirectory,
+        workingDirectory,
+        user,
+        taskCommand,
+        shutdownGracePeriod);
+
+    spawn(process);
+  }
+
+  virtual ~CommandExecutor()
+  {
+    terminate(process);
+    wait(process);
+    delete process;
+  }
+
+  virtual void registered(
+        ExecutorDriver* driver,
+        const ExecutorInfo& executorInfo,
+        const FrameworkInfo& frameworkInfo,
+        const SlaveInfo& slaveInfo)
+  {
+    dispatch(process,
+             &CommandExecutorProcess::registered,
+             driver,
+             executorInfo,
+             frameworkInfo,
+             slaveInfo);
+  }
+
+  virtual void reregistered(
+      ExecutorDriver* driver,
+      const SlaveInfo& slaveInfo)
+  {
+    dispatch(process,
+             &CommandExecutorProcess::reregistered,
+             driver,
+             slaveInfo);
+  }
+
+  virtual void disconnected(ExecutorDriver* driver)
+  {
+    dispatch(process, &CommandExecutorProcess::disconnected, driver);
+  }
+
+  virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+  {
+    dispatch(process, &CommandExecutorProcess::launchTask, driver, task);
+  }
+
+  virtual void killTask(ExecutorDriver* driver, const TaskID& taskId)
+  {
+    dispatch(process, &CommandExecutorProcess::killTask, driver, taskId);
+  }
+
+  virtual void frameworkMessage(ExecutorDriver* driver, const string& data)
+  {
+    dispatch(process, &CommandExecutorProcess::frameworkMessage, driver, data);
+  }
+
+  virtual void shutdown(ExecutorDriver* driver)
+  {
+    dispatch(process, &CommandExecutorProcess::shutdown, driver);
+  }
+
+  virtual void error(ExecutorDriver* driver, const string& data)
+  {
+    dispatch(process, &CommandExecutorProcess::error, driver, data);
+  }
+
+private:
+  CommandExecutorProcess* process;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+
+class Flags : public flags::FlagsBase
+{
+public:
+  Flags()
+  {
+    // TODO(gilbert): Deprecate the 'override' flag since no one is
+    // using it, and it may cause confusing with 'task_command' flag.
+    add(&override,
+        "override",
+        "Whether to override the command the executor should run when the\n"
+        "task is launched. Only this flag is expected to be on the command\n"
+        "line and all arguments after the flag will be used as the\n"
+        "subsequent 'argv' to be used with 'execvp'",
+        false);
+
+    // The following flags are only applicable when a rootfs is
+    // provisioned for this command.
+    add(&sandbox_directory,
+        "sandbox_directory",
+        "The absolute path for the directory in the container where the\n"
+        "sandbox is mapped to");
+
+    add(&working_directory,
+        "working_directory",
+        "The working directory for the task in the container.");
+
+    add(&user,
+        "user",
+        "The user that the task should be running as.");
+
+    add(&task_command,
+        "task_command",
+        "If specified, this is the overrided command for launching the\n"
+        "task (instead of the command from TaskInfo).");
+
+    // TODO(nnielsen): Add 'prefix' option to enable replacing
+    // 'sh -c' with user specified wrapper.
+  }
+
+  bool override;
+  Option<string> sandbox_directory;
+  Option<string> working_directory;
+  Option<string> user;
+  Option<string> task_command;
+};
+
+
+int main(int argc, char** argv)
+{
+  Flags flags;
+
+  // Load flags from command line.
+  Try<Nothing> load = flags.load(None(), &argc, &argv);
+
+  if (load.isError()) {
+    cerr << flags.usage(load.error()) << endl;
+    return EXIT_FAILURE;
+  }
+
+  if (flags.help) {
+    cout << flags.usage() << endl;
+    return EXIT_SUCCESS;
+  }
+
+  // After flags.load(..., &argc, &argv) all flags will have been
+  // stripped from argv. Additionally, arguments after a "--"
+  // terminator will be preservered in argv and it is therefore
+  // possible to pass override and prefix commands which use
+  // "--foobar" style flags.
+  Option<char**> override = None();
+  if (flags.override) {
+    if (argc > 1) {
+      override = argv + 1;
+    }
+  }
+
+  const Option<string> envPath = os::getenv("MESOS_LAUNCHER_DIR");
+
+  string path = envPath.isSome()
+    ? envPath.get()
+    : os::realpath(Path(argv[0]).dirname()).get();
+
+  // Get executor shutdown grace period from the environment.
+  //
+  // NOTE: We avoided introducing a command executor flag for this
+  // because the command executor exits if it sees an unknown flag.
+  // This makes it difficult to add or remove command executor flags
+  // that are unconditionally set by the agent.
+  Duration shutdownGracePeriod = DEFAULT_EXECUTOR_SHUTDOWN_GRACE_PERIOD;
+  Option<string> value = os::getenv("MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD");
+  if (value.isSome()) {
+    Try<Duration> parse = Duration::parse(value.get());
+    if (parse.isError()) {
+      cerr << "Failed to parse value '" << value.get() << "'"
+           << " of 'MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD': " << parse.error();
+      return EXIT_FAILURE;
+    }
+
+    shutdownGracePeriod = parse.get();
+  }
+
+  mesos::internal::CommandExecutor executor(
+      override,
+      path,
+      flags.sandbox_directory,
+      flags.working_directory,
+      flags.user,
+      flags.task_command,
+      shutdownGracePeriod);
+
+  mesos::MesosExecutorDriver driver(&executor);
+
+  return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
+}