You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/04/14 01:27:48 UTC
[1/4] mesos git commit: Updated tests for HTTP command executor.
Repository: mesos
Updated Branches:
refs/heads/master e492b5427 -> 124a05b4f
Updated tests for HTTP command executor.
Review: https://reviews.apache.org/r/45670/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/124a05b4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/124a05b4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/124a05b4
Branch: refs/heads/master
Commit: 124a05b4fe650ddd1ff08e2a616fe4ee4ba8bd5f
Parents: df24b67
Author: Qian Zhang <zh...@cn.ibm.com>
Authored: Wed Apr 13 15:55:34 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Apr 13 16:27:39 2016 -0700
----------------------------------------------------------------------
src/tests/command_executor_tests.cpp | 30 +++++++++++++++++++++++++-----
1 file changed, 25 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/124a05b4/src/tests/command_executor_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/command_executor_tests.cpp b/src/tests/command_executor_tests.cpp
index 843233a..07e5eb4 100644
--- a/src/tests/command_executor_tests.cpp
+++ b/src/tests/command_executor_tests.cpp
@@ -47,6 +47,8 @@ using process::PID;
using std::vector;
+using ::testing::WithParamInterface;
+
namespace mesos {
namespace internal {
namespace tests {
@@ -54,18 +56,32 @@ namespace tests {
// Tests that exercise the command executor implementation
// should be located in this file.
-class CommandExecutorTest : public MesosTest {};
+class CommandExecutorTest
+ : public MesosTest,
+ public WithParamInterface<bool> {};
+
+
+// The Command Executor tests are parameterized by the underlying library
+// used by the executor (e.g., driver or the HTTP based executor library).
+INSTANTIATE_TEST_CASE_P(
+ HTTPCommandExecutor,
+ CommandExecutorTest,
+ ::testing::Bool());
// This test ensures that the command executor does not send
// TASK_KILLING to frameworks that do not support the capability.
-TEST_F(CommandExecutorTest, NoTaskKillingCapability)
+TEST_P(CommandExecutorTest, NoTaskKillingCapability)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.http_command_executor = GetParam();
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
// Start the framework without the task killing capability.
@@ -117,13 +133,17 @@ TEST_F(CommandExecutorTest, NoTaskKillingCapability)
// This test ensures that the command executor sends TASK_KILLING
// to frameworks that support the capability.
-TEST_F(CommandExecutorTest, TaskKillingCapability)
+TEST_P(CommandExecutorTest, TaskKillingCapability)
{
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.http_command_executor = GetParam();
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
ASSERT_SOME(slave);
// Start the framework with the task killing capability.
[3/4] mesos git commit: Added --http_command_executor flag.
Posted by vi...@apache.org.
Added --http_command_executor flag.
Review: https://reviews.apache.org/r/44427/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/df24b670
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/df24b670
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/df24b670
Branch: refs/heads/master
Commit: df24b6700aa79448782eab6277039d9129f9a17e
Parents: bec4d71
Author: Qian Zhang <zh...@cn.ibm.com>
Authored: Wed Apr 13 15:55:29 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Apr 13 16:27:39 2016 -0700
----------------------------------------------------------------------
docs/configuration.md | 13 +++++++++++++
src/slave/flags.cpp | 10 ++++++++++
src/slave/flags.hpp | 1 +
src/slave/slave.cpp | 28 ++++++++++++++++++++++++----
4 files changed, 48 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/df24b670/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index ba00ec5..ce51f26 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -1290,6 +1290,19 @@ Example:
</tr>
<tr>
<td>
+ --[no-]http_command_executor
+ </td>
+ <td>
+The underlying executor library to be used for the command executor.
+If set to <code>true</code>, the command executor would use the HTTP based
+executor library to interact with the Mesos agent. If set to <code>false</code>,
+the driver based implementation would be used.
+<b>NOTE</b>: This flag is *experimental* and should not be used in
+production yet. (default: false)
+ </td>
+</tr>
+<tr>
+ <td>
--image_providers=VALUE
</td>
<td>
http://git-wip-us.apache.org/repos/asf/mesos/blob/df24b670/src/slave/flags.cpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.cpp b/src/slave/flags.cpp
index dd7bc9a..316feec 100644
--- a/src/slave/flags.cpp
+++ b/src/slave/flags.cpp
@@ -783,4 +783,14 @@ mesos::internal::slave::Flags::Flags()
"The ranges of XFS project IDs to use for tracking directory quotas",
"[5000-10000]");
#endif
+
+ add(&Flags::http_command_executor,
+ "http_command_executor",
+ "The underlying executor library to be used for the command executor.\n"
+ "If set to `true`, the command executor would use the HTTP based\n"
+ "executor library to interact with the Mesos agent. If set to `false`,\n"
+ "the driver based implementation would be used.\n"
+ "NOTE: This flag is *experimental* and should not be used in\n"
+ "production yet.",
+ false);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/df24b670/src/slave/flags.hpp
----------------------------------------------------------------------
diff --git a/src/slave/flags.hpp b/src/slave/flags.hpp
index 300db49..ee520ac 100644
--- a/src/slave/flags.hpp
+++ b/src/slave/flags.hpp
@@ -147,6 +147,7 @@ public:
#if ENABLE_XFS_DISK_ISOLATOR
std::string xfs_project_range;
#endif
+ bool http_command_executor;
};
} // namespace slave {
http://git-wip-us.apache.org/repos/asf/mesos/blob/df24b670/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 6ee277d..49fa4a0 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3898,8 +3898,14 @@ ExecutorInfo Slave::getExecutorInfo(
executor.mutable_command()->set_user(task.command().user());
}
- Result<string> path =
- os::realpath(path::join(flags.launcher_dir, "mesos-executor"));
+ Result<string> path = None();
+ if (flags.http_command_executor) {
+ path =
+ os::realpath(path::join(flags.launcher_dir, "mesos-http-executor"));
+ } else {
+ path =
+ os::realpath(path::join(flags.launcher_dir, "mesos-executor"));
+ }
// Explicitly set 'shell' to true since we want to use the shell
// for running the mesos-executor (and even though this is the
@@ -3909,7 +3915,11 @@ ExecutorInfo Slave::getExecutorInfo(
if (path.isSome()) {
if (hasRootfs) {
executor.mutable_command()->set_shell(false);
- executor.mutable_command()->add_arguments("mesos-executor");
+ if (flags.http_command_executor) {
+ executor.mutable_command()->add_arguments("mesos-http-executor");
+ } else {
+ executor.mutable_command()->add_arguments("mesos-executor");
+ }
executor.mutable_command()->add_arguments(
"--sandbox_directory=" + flags.sandbox_directory);
@@ -5864,12 +5874,22 @@ Executor::Executor(
{
CHECK_NOTNULL(slave);
+ // See if this is driver based command executor.
Result<string> executorPath =
os::realpath(path::join(slave->flags.launcher_dir, "mesos-executor"));
if (executorPath.isSome()) {
commandExecutor =
- strings::contains(info.command().value(), executorPath.get());
+ strings::contains(info.command().value(), executorPath.get());
+ }
+
+ // See if this is HTTP based command executor.
+ if (!commandExecutor) {
+ executorPath = os::realpath(
+ path::join(slave->flags.launcher_dir, "mesos-http-executor"));
+
+ commandExecutor =
+ strings::contains(info.command().value(), executorPath.get());
}
}
[2/4] mesos git commit: Updated http_command_executor.cpp to use v1
API.
Posted by vi...@apache.org.
Updated http_command_executor.cpp to use v1 API.
Review: https://reviews.apache.org/r/44424/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bec4d711
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bec4d711
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bec4d711
Branch: refs/heads/master
Commit: bec4d711c3fc190138d100023bba9e3fe087052e
Parents: ed30403
Author: Qian Zhang <zh...@cn.ibm.com>
Authored: Wed Apr 13 15:55:25 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Apr 13 16:27:39 2016 -0700
----------------------------------------------------------------------
src/launcher/http_command_executor.cpp | 539 +++++++++++++++-------------
1 file changed, 295 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/bec4d711/src/launcher/http_command_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/http_command_executor.cpp b/src/launcher/http_command_executor.cpp
index 7677391..ad484e0 100644
--- a/src/launcher/http_command_executor.cpp
+++ b/src/launcher/http_command_executor.cpp
@@ -24,7 +24,9 @@
#include <string>
#include <vector>
-#include <mesos/executor.hpp>
+#include <mesos/v1/executor.hpp>
+#include <mesos/v1/mesos.hpp>
+
#include <mesos/type_utils.hpp>
#include <process/defer.hpp>
@@ -41,6 +43,7 @@
#include <stout/flags.hpp>
#include <stout/json.hpp>
#include <stout/lambda.hpp>
+#include <stout/linkedhashmap.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
@@ -50,6 +53,8 @@
#include "common/http.hpp"
#include "common/status_utils.hpp"
+#include "internal/evolve.hpp"
+
#ifdef __linux__
#include "linux/fs.hpp"
#endif
@@ -60,101 +65,230 @@
#include "slave/constants.hpp"
-using namespace mesos::internal::slave;
+#ifdef __linux__
+namespace fs = mesos::internal::fs;
+#endif
-using process::wait; // Necessary on some OS's to disambiguate.
+using namespace mesos::internal::slave;
using std::cout;
using std::cerr;
using std::endl;
+using std::queue;
using std::string;
using std::vector;
+using process::Clock;
+using process::Future;
+using process::Owned;
+using process::Subprocess;
+using process::Timer;
+
+using mesos::internal::evolve;
+using mesos::internal::TaskHealthStatus;
+
+using mesos::v1::ExecutorID;
+using mesos::v1::FrameworkID;
+
+using mesos::v1::executor::Call;
+using mesos::v1::executor::Event;
+using mesos::v1::executor::Mesos;
+
namespace mesos {
+namespace v1 {
namespace internal {
-using namespace process;
-
-class CommandExecutorProcess : public ProtobufProcess<CommandExecutorProcess>
+class HttpCommandExecutor: public ProtobufProcess<HttpCommandExecutor>
{
public:
- CommandExecutorProcess(
- const Option<char**>& override,
+ HttpCommandExecutor(
+ const Option<char**>& _override,
const string& _healthCheckDir,
const Option<string>& _sandboxDirectory,
const Option<string>& _workingDirectory,
const Option<string>& _user,
const Option<string>& _taskCommand,
+ const FrameworkID& _frameworkId,
+ const ExecutorID& _executorId,
const Duration& _shutdownGracePeriod)
- : state(REGISTERING),
- launched(false),
- killed(false),
- killedByHealthCheck(false),
- pid(-1),
- healthPid(-1),
- shutdownGracePeriod(_shutdownGracePeriod),
- driver(None()),
- frameworkInfo(None()),
- taskId(None()),
- healthCheckDir(_healthCheckDir),
- override(override),
- sandboxDirectory(_sandboxDirectory),
- workingDirectory(_workingDirectory),
- user(_user),
- taskCommand(_taskCommand) {}
-
- virtual ~CommandExecutorProcess() {}
-
- void registered(
- ExecutorDriver* _driver,
- const ExecutorInfo& _executorInfo,
- const FrameworkInfo& _frameworkInfo,
- const SlaveInfo& slaveInfo)
+ : state(DISCONNECTED),
+ launched(false),
+ killed(false),
+ killedByHealthCheck(false),
+ pid(-1),
+ healthPid(-1),
+ shutdownGracePeriod(_shutdownGracePeriod),
+ frameworkInfo(None()),
+ taskId(None()),
+ healthCheckDir(_healthCheckDir),
+ override(_override),
+ sandboxDirectory(_sandboxDirectory),
+ workingDirectory(_workingDirectory),
+ user(_user),
+ taskCommand(_taskCommand),
+ frameworkId(_frameworkId),
+ executorId(_executorId),
+ task(None()) {}
+
+ virtual ~HttpCommandExecutor() = default;
+
+ void connected()
{
- CHECK_EQ(REGISTERING, state);
+ state = CONNECTED;
+
+ doReliableRegistration();
+ }
+
+ void disconnected()
+ {
+ state = DISCONNECTED;
+ }
+
+ void received(queue<Event> events)
+ {
+ while (!events.empty()) {
+ Event event = events.front();
+ events.pop();
+
+ cout << "Received " << event.type() << " event" << endl;
+
+ switch (event.type()) {
+ case Event::SUBSCRIBED: {
+ cout << "Subscribed executor on "
+ << event.subscribed().agent_info().hostname() << endl;
+
+ frameworkInfo = event.subscribed().framework_info();
+ state = SUBSCRIBED;
+ break;
+ }
- cout << "Registered executor on " << slaveInfo.hostname() << endl;
+ case Event::LAUNCH: {
+ launch(event.launch().task());
+ break;
+ }
- driver = _driver;
- frameworkInfo = _frameworkInfo;
+ case Event::KILL: {
+ kill(event.kill().task_id());
+ break;
+ }
- state = REGISTERED;
+ case Event::ACKNOWLEDGED: {
+ // Remove the corresponding update.
+ updates.erase(UUID::fromBytes(event.acknowledged().uuid()));
+
+ // Remove the corresponding task.
+ task = None();
+ break;
+ }
+
+ case Event::SHUTDOWN: {
+ shutdown();
+ break;
+ }
+
+ case Event::MESSAGE: {
+ break;
+ }
+
+ case Event::ERROR: {
+ cerr << "Error: " << event.error().message() << endl;
+ }
+
+ default: {
+ UNREACHABLE();
+ }
+ }
+ }
}
- void reregistered(
- ExecutorDriver* driver,
- const SlaveInfo& slaveInfo)
+protected:
+ virtual void initialize()
{
- CHECK(state == REGISTERED || state == REGISTERING) << state;
+ // TODO(qianzhang): Currently, the `mesos-health-check` binary can only
+ // send unversioned `TaskHealthStatus` messages. This needs to be revisited
+ // as part of MESOS-5103.
+ install<TaskHealthStatus>(
+ &HttpCommandExecutor::taskHealthUpdated,
+ &TaskHealthStatus::task_id,
+ &TaskHealthStatus::healthy,
+ &TaskHealthStatus::kill_task);
- cout << "Re-registered executor on " << slaveInfo.hostname() << endl;
+ // We initialize the library here to ensure that callbacks are only invoked
+ // after the process has spawned.
+ mesos.reset(new Mesos(
+ mesos::ContentType::PROTOBUF,
+ defer(self(), &Self::connected),
+ defer(self(), &Self::disconnected),
+ defer(self(), &Self::received, lambda::_1)));
+ }
- state = REGISTERED;
+ void taskHealthUpdated(
+ const mesos::TaskID& taskID,
+ const bool healthy,
+ const bool initiateTaskKill)
+ {
+ cout << "Received task health update, healthy: "
+ << stringify(healthy) << endl;
+
+ update(evolve(taskID), TASK_RUNNING, healthy);
+
+ if (initiateTaskKill) {
+ killedByHealthCheck = true;
+ kill(evolve(taskID));
+ }
}
- void disconnected(ExecutorDriver* driver) {}
+ void doReliableRegistration()
+ {
+ if (state == SUBSCRIBED || state == DISCONNECTED) {
+ return;
+ }
+
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.mutable_executor_id()->CopyFrom(executorId);
- void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+
+ // Send all unacknowledged updates.
+ foreach (const Call::Update& update, updates.values()) {
+ subscribe->add_unacknowledged_updates()->MergeFrom(update);
+ }
+
+ // Send the unacknowledged task.
+ if (task.isSome()) {
+ subscribe->add_unacknowledged_tasks()->MergeFrom(task.get());
+ }
+
+ mesos->send(call);
+
+ delay(Seconds(1), self(), &Self::doReliableRegistration);
+ }
+
+ void launch(const TaskInfo& _task)
{
- CHECK_EQ(REGISTERED, state);
+ CHECK_EQ(SUBSCRIBED, state);
if (launched) {
- TaskStatus status;
- status.mutable_task_id()->MergeFrom(task.task_id());
- status.set_state(TASK_FAILED);
- status.set_message(
+ update(
+ _task.task_id(),
+ TASK_FAILED,
+ None(),
"Attempted to run multiple tasks using a \"command\" executor");
-
- driver->sendStatusUpdate(status);
return;
}
+ // Capture the task.
+ task = _task;
+
// Capture the TaskID.
- taskId = task.task_id();
+ taskId = task->task_id();
// Capture the kill policy.
- if (task.has_kill_policy()) {
- killPolicy = task.kill_policy();
+ if (task->has_kill_policy()) {
+ killPolicy = task->kill_policy();
}
// Determine the command to launch the task.
@@ -175,11 +309,11 @@ public:
}
command = parse.get();
- } else if (task.has_command()) {
- command = task.command();
+ } else if (task->has_command()) {
+ command = task->command();
} else {
CHECK_SOME(override)
- << "Expecting task '" << task.task_id()
+ << "Expecting task '" << task->task_id()
<< "' to have a command!";
}
@@ -191,16 +325,16 @@ public:
// correct solution is to perform this validation at master side.
if (command.shell()) {
CHECK(command.has_value())
- << "Shell command of task '" << task.task_id()
+ << "Shell command of task '" << task->task_id()
<< "' is not specified!";
} else {
CHECK(command.has_value())
- << "Executable of task '" << task.task_id()
+ << "Executable of task '" << task->task_id()
<< "' is not specified!";
}
}
- cout << "Starting task " << task.task_id() << endl;
+ cout << "Starting task " << task->task_id() << endl;
// TODO(benh): Clean this up with the new 'Fork' abstraction.
// Use pipes to determine which child has successfully changed
@@ -448,25 +582,22 @@ public:
cout << "Forked command at " << pid << endl;
- if (task.has_health_check()) {
- launchHealthCheck(task);
+ if (task->has_health_check()) {
+ launchHealthCheck(task.get());
}
// Monitor this process.
process::reap(pid)
- .onAny(defer(self(), &Self::reaped, driver, pid, lambda::_1));
+ .onAny(defer(self(), &Self::reaped, pid, lambda::_1));
- TaskStatus status;
- status.mutable_task_id()->MergeFrom(task.task_id());
- status.set_state(TASK_RUNNING);
- driver->sendStatusUpdate(status);
+ update(task->task_id(), TASK_RUNNING);
launched = true;
}
- void killTask(ExecutorDriver* driver, const TaskID& taskId)
+ void kill(const TaskID& taskId)
{
- cout << "Received killTask for task " << taskId.value() << endl;
+ cout << "Received kill for task " << taskId.value() << endl;
// Default grace period is set to 3s for backwards compatibility.
//
@@ -478,12 +609,10 @@ public:
gracePeriod = Nanoseconds(killPolicy->grace_period().nanoseconds());
}
- killTask(driver, taskId, gracePeriod);
+ kill(taskId, gracePeriod);
}
- void frameworkMessage(ExecutorDriver* driver, const string& data) {}
-
- void shutdown(ExecutorDriver* driver)
+ void shutdown()
{
cout << "Shutting down" << endl;
@@ -504,53 +633,12 @@ public:
// agent is smaller than the kill grace period).
if (launched) {
CHECK_SOME(taskId);
- killTask(driver, taskId.get(), gracePeriod);
- } else {
- driver->stop();
- }
- }
-
- virtual void error(ExecutorDriver* driver, const string& message) {}
-
-protected:
- virtual void initialize()
- {
- install<TaskHealthStatus>(
- &CommandExecutorProcess::taskHealthUpdated,
- &TaskHealthStatus::task_id,
- &TaskHealthStatus::healthy,
- &TaskHealthStatus::kill_task);
- }
-
- void taskHealthUpdated(
- const TaskID& taskID,
- const bool& healthy,
- const bool& initiateTaskKill)
- {
- if (driver.isNone()) {
- return;
- }
-
- cout << "Received task health update, healthy: "
- << stringify(healthy) << endl;
-
- TaskStatus status;
- status.mutable_task_id()->CopyFrom(taskID);
- status.set_healthy(healthy);
- status.set_state(TASK_RUNNING);
- driver.get()->sendStatusUpdate(status);
-
- if (initiateTaskKill) {
- killedByHealthCheck = true;
- killTask(driver.get(), taskID);
+ kill(taskId.get(), gracePeriod);
}
}
private:
- void killTask(
- ExecutorDriver* driver,
- const TaskID& _taskId,
- const Duration& gracePeriod)
+ void kill(const TaskID& _taskId, const Duration& gracePeriod)
{
if (launched && !killed) {
// Send TASK_KILLING if the framework can handle it.
@@ -561,10 +649,7 @@ private:
foreach (const FrameworkInfo::Capability& c,
frameworkInfo->capabilities()) {
if (c.type() == FrameworkInfo::Capability::TASK_KILLING_STATE) {
- TaskStatus status;
- status.mutable_task_id()->CopyFrom(taskId.get());
- status.set_state(TASK_KILLING);
- driver->sendStatusUpdate(status);
+ update(taskId.get(), TASK_KILLING);
break;
}
}
@@ -606,10 +691,7 @@ private:
}
}
- void reaped(
- ExecutorDriver* driver,
- pid_t pid,
- const Future<Option<int> >& status_)
+ void reaped(pid_t pid, const Future<Option<int> >& status_)
{
TaskState taskState;
string message;
@@ -632,7 +714,7 @@ private:
taskState = TASK_FINISHED;
} else if (killed) {
// Send TASK_KILLED if the task was killed as a result of
- // killTask() or shutdown().
+ // kill() or shutdown().
taskState = TASK_KILLED;
} else {
taskState = TASK_FAILED;
@@ -645,22 +727,17 @@ private:
CHECK_SOME(taskId);
- TaskStatus taskStatus;
- taskStatus.mutable_task_id()->MergeFrom(taskId.get());
- taskStatus.set_state(taskState);
- taskStatus.set_message(message);
if (killed && killedByHealthCheck) {
- taskStatus.set_healthy(false);
+ update(taskId.get(), taskState, false, message);
+ } else {
+ update(taskId.get(), taskState, None(), message);
}
- driver->sendStatusUpdate(taskStatus);
-
- // This is a hack to ensure the message is sent to the
- // slave before we exit the process. Without this, we
- // may exit before libprocess has sent the data over
- // the socket. See MESOS-4111.
+ // TODO(qianzhang): Remove this hack since the executor now receives
+ // acknowledgements for status updates. The executor can terminate
+ // after it receives an ACK for a terminal status update.
os::sleep(Seconds(1));
- driver->stop();
+ terminate(self());
}
void escalated(Duration timeout)
@@ -728,10 +805,49 @@ private:
<< stringify(healthPid) << endl;
}
+ void update(
+ const TaskID& taskID,
+ const TaskState& state,
+ const Option<bool>& healthy = None(),
+ const Option<string>& message = None())
+ {
+ UUID uuid = UUID::random();
+
+ TaskStatus status;
+ status.mutable_task_id()->CopyFrom(taskID);
+ status.mutable_executor_id()->CopyFrom(executorId);
+
+ status.set_state(state);
+ status.set_source(TaskStatus::SOURCE_EXECUTOR);
+ status.set_uuid(uuid.toBytes());
+
+ if (healthy.isSome()) {
+ status.set_healthy(healthy.get());
+ }
+
+ if (message.isSome()) {
+ status.set_message(message.get());
+ }
+
+ Call call;
+ call.set_type(Call::UPDATE);
+
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.mutable_executor_id()->CopyFrom(executorId);
+
+ call.mutable_update()->mutable_status()->CopyFrom(status);
+
+ // Capture the status update.
+ updates[uuid] = call.update();
+
+ mesos->send(call);
+ }
+
enum State
{
- REGISTERING, // Executor is launched but not (re-)registered yet.
- REGISTERED, // Executor has (re-)registered.
+ CONNECTED,
+ DISCONNECTED,
+ SUBSCRIBED
} state;
bool launched;
@@ -742,7 +858,6 @@ private:
Duration shutdownGracePeriod;
Option<KillPolicy> killPolicy;
Timer escalationTimer;
- Option<ExecutorDriver*> driver;
Option<FrameworkInfo> frameworkInfo;
Option<TaskID> taskId;
string healthCheckDir;
@@ -751,99 +866,15 @@ private:
Option<string> workingDirectory;
Option<string> user;
Option<string> taskCommand;
-};
-
-
-class CommandExecutor: public Executor
-{
-public:
- CommandExecutor(
- const Option<char**>& override,
- const string& healthCheckDir,
- const Option<string>& sandboxDirectory,
- const Option<string>& workingDirectory,
- const Option<string>& user,
- const Option<string>& taskCommand,
- const Duration& shutdownGracePeriod)
- {
- process = new CommandExecutorProcess(
- override,
- healthCheckDir,
- sandboxDirectory,
- workingDirectory,
- user,
- taskCommand,
- shutdownGracePeriod);
-
- spawn(process);
- }
-
- virtual ~CommandExecutor()
- {
- terminate(process);
- wait(process);
- delete process;
- }
-
- virtual void registered(
- ExecutorDriver* driver,
- const ExecutorInfo& executorInfo,
- const FrameworkInfo& frameworkInfo,
- const SlaveInfo& slaveInfo)
- {
- dispatch(process,
- &CommandExecutorProcess::registered,
- driver,
- executorInfo,
- frameworkInfo,
- slaveInfo);
- }
-
- virtual void reregistered(
- ExecutorDriver* driver,
- const SlaveInfo& slaveInfo)
- {
- dispatch(process,
- &CommandExecutorProcess::reregistered,
- driver,
- slaveInfo);
- }
-
- virtual void disconnected(ExecutorDriver* driver)
- {
- dispatch(process, &CommandExecutorProcess::disconnected, driver);
- }
-
- virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
- {
- dispatch(process, &CommandExecutorProcess::launchTask, driver, task);
- }
-
- virtual void killTask(ExecutorDriver* driver, const TaskID& taskId)
- {
- dispatch(process, &CommandExecutorProcess::killTask, driver, taskId);
- }
-
- virtual void frameworkMessage(ExecutorDriver* driver, const string& data)
- {
- dispatch(process, &CommandExecutorProcess::frameworkMessage, driver, data);
- }
-
- virtual void shutdown(ExecutorDriver* driver)
- {
- dispatch(process, &CommandExecutorProcess::shutdown, driver);
- }
-
- virtual void error(ExecutorDriver* driver, const string& data)
- {
- dispatch(process, &CommandExecutorProcess::error, driver, data);
- }
-
-private:
- CommandExecutorProcess* process;
+ const FrameworkID frameworkId;
+ const ExecutorID executorId;
+ Owned<Mesos> mesos;
+ LinkedHashMap<UUID, Call::Update> updates; // Unacknowledged updates.
+ Option<TaskInfo> task; // Unacknowledged task.
};
} // namespace internal {
+} // namespace v1 {
} // namespace mesos {
@@ -897,6 +928,8 @@ public:
int main(int argc, char** argv)
{
Flags flags;
+ FrameworkID frameworkId;
+ ExecutorID executorId;
// Load flags from command line.
Try<Nothing> load = flags.load(None(), &argc, &argv);
@@ -929,6 +962,20 @@ int main(int argc, char** argv)
? envPath.get()
: os::realpath(Path(argv[0]).dirname()).get();
+ Option<string> value = os::getenv("MESOS_FRAMEWORK_ID");
+ if (value.isNone()) {
+ EXIT(EXIT_FAILURE)
+ << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment";
+ }
+ frameworkId.set_value(value.get());
+
+ value = os::getenv("MESOS_EXECUTOR_ID");
+ if (value.isNone()) {
+ EXIT(EXIT_FAILURE)
+ << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment";
+ }
+ executorId.set_value(value.get());
+
// Get executor shutdown grace period from the environment.
//
// NOTE: We avoided introducing a command executor flag for this
@@ -936,7 +983,7 @@ int main(int argc, char** argv)
// This makes it difficult to add or remove command executor flags
// that are unconditionally set by the agent.
Duration shutdownGracePeriod = DEFAULT_EXECUTOR_SHUTDOWN_GRACE_PERIOD;
- Option<string> value = os::getenv("MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD");
+ value = os::getenv("MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD");
if (value.isSome()) {
Try<Duration> parse = Duration::parse(value.get());
if (parse.isError()) {
@@ -948,16 +995,20 @@ int main(int argc, char** argv)
shutdownGracePeriod = parse.get();
}
- mesos::internal::CommandExecutor executor(
- override,
- path,
- flags.sandbox_directory,
- flags.working_directory,
- flags.user,
- flags.task_command,
- shutdownGracePeriod);
-
- mesos::MesosExecutorDriver driver(&executor);
-
- return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
+ Owned<mesos::v1::internal::HttpCommandExecutor> executor(
+ new mesos::v1::internal::HttpCommandExecutor(
+ override,
+ path,
+ flags.sandbox_directory,
+ flags.working_directory,
+ flags.user,
+ flags.task_command,
+ frameworkId,
+ executorId,
+ shutdownGracePeriod));
+
+ process::spawn(executor.get());
+ process::wait(executor.get());
+
+ return EXIT_SUCCESS;
}
[4/4] mesos git commit: Added HTTP command executor to make files.
Posted by vi...@apache.org.
Added HTTP command executor to make files.
Added HTTP command executor to make files. For now the content in
http_command_executor.cpp is identical to executor.cpp, and it
will be updated in the subsequent review.
Review: https://reviews.apache.org/r/44423/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ed304030
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ed304030
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ed304030
Branch: refs/heads/master
Commit: ed304030fc5a36d1c1f13e505d6b56f928a81cd4
Parents: e492b54
Author: Qian Zhang <zh...@cn.ibm.com>
Authored: Wed Apr 13 15:55:21 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Apr 13 16:27:39 2016 -0700
----------------------------------------------------------------------
src/Makefile.am | 5 +
src/launcher/http_command_executor.cpp | 963 ++++++++++++++++++++++++++++
2 files changed, 968 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ed304030/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index a8f6831..139935f 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1181,6 +1181,11 @@ mesos_executor_SOURCES = launcher/executor.cpp
mesos_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
mesos_executor_LDADD = libmesos.la $(LDADD)
+pkglibexec_PROGRAMS += mesos-http-executor
+mesos_http_executor_SOURCES = launcher/http_command_executor.cpp
+mesos_http_executor_CPPFLAGS = $(MESOS_CPPFLAGS)
+mesos_http_executor_LDADD = libmesos.la $(LDADD)
+
pkglibexec_PROGRAMS += mesos-containerizer
mesos_containerizer_SOURCES = slave/containerizer/mesos/main.cpp
mesos_containerizer_CPPFLAGS = $(MESOS_CPPFLAGS)
http://git-wip-us.apache.org/repos/asf/mesos/blob/ed304030/src/launcher/http_command_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/http_command_executor.cpp b/src/launcher/http_command_executor.cpp
new file mode 100644
index 0000000..7677391
--- /dev/null
+++ b/src/launcher/http_command_executor.cpp
@@ -0,0 +1,963 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include <signal.h>
+#include <stdio.h>
+
+#include <sys/wait.h>
+
+#include <iostream>
+#include <list>
+#include <string>
+#include <vector>
+
+#include <mesos/executor.hpp>
+#include <mesos/type_utils.hpp>
+
+#include <process/defer.hpp>
+#include <process/delay.hpp>
+#include <process/future.hpp>
+#include <process/io.hpp>
+#include <process/process.hpp>
+#include <process/protobuf.hpp>
+#include <process/subprocess.hpp>
+#include <process/reap.hpp>
+#include <process/timer.hpp>
+
+#include <stout/duration.hpp>
+#include <stout/flags.hpp>
+#include <stout/json.hpp>
+#include <stout/lambda.hpp>
+#include <stout/option.hpp>
+#include <stout/os.hpp>
+#include <stout/path.hpp>
+#include <stout/protobuf.hpp>
+#include <stout/strings.hpp>
+
+#include "common/http.hpp"
+#include "common/status_utils.hpp"
+
+#ifdef __linux__
+#include "linux/fs.hpp"
+#endif
+
+#include "logging/logging.hpp"
+
+#include "messages/messages.hpp"
+
+#include "slave/constants.hpp"
+
+using namespace mesos::internal::slave;
+
+using process::wait; // Necessary on some OS's to disambiguate.
+
+using std::cout;
+using std::cerr;
+using std::endl;
+using std::string;
+using std::vector;
+
+namespace mesos {
+namespace internal {
+
+using namespace process;
+
+class CommandExecutorProcess : public ProtobufProcess<CommandExecutorProcess>
+{
+public:
+ CommandExecutorProcess(
+ const Option<char**>& override,
+ const string& _healthCheckDir,
+ const Option<string>& _sandboxDirectory,
+ const Option<string>& _workingDirectory,
+ const Option<string>& _user,
+ const Option<string>& _taskCommand,
+ const Duration& _shutdownGracePeriod)
+ : state(REGISTERING),
+ launched(false),
+ killed(false),
+ killedByHealthCheck(false),
+ pid(-1),
+ healthPid(-1),
+ shutdownGracePeriod(_shutdownGracePeriod),
+ driver(None()),
+ frameworkInfo(None()),
+ taskId(None()),
+ healthCheckDir(_healthCheckDir),
+ override(override),
+ sandboxDirectory(_sandboxDirectory),
+ workingDirectory(_workingDirectory),
+ user(_user),
+ taskCommand(_taskCommand) {}
+
+ virtual ~CommandExecutorProcess() {}
+
+ void registered(
+ ExecutorDriver* _driver,
+ const ExecutorInfo& _executorInfo,
+ const FrameworkInfo& _frameworkInfo,
+ const SlaveInfo& slaveInfo)
+ {
+ CHECK_EQ(REGISTERING, state);
+
+ cout << "Registered executor on " << slaveInfo.hostname() << endl;
+
+ driver = _driver;
+ frameworkInfo = _frameworkInfo;
+
+ state = REGISTERED;
+ }
+
+ void reregistered(
+ ExecutorDriver* driver,
+ const SlaveInfo& slaveInfo)
+ {
+ CHECK(state == REGISTERED || state == REGISTERING) << state;
+
+ cout << "Re-registered executor on " << slaveInfo.hostname() << endl;
+
+ state = REGISTERED;
+ }
+
+ void disconnected(ExecutorDriver* driver) {}
+
+ void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+ {
+ CHECK_EQ(REGISTERED, state);
+
+ if (launched) {
+ TaskStatus status;
+ status.mutable_task_id()->MergeFrom(task.task_id());
+ status.set_state(TASK_FAILED);
+ status.set_message(
+ "Attempted to run multiple tasks using a \"command\" executor");
+
+ driver->sendStatusUpdate(status);
+ return;
+ }
+
+ // Capture the TaskID.
+ taskId = task.task_id();
+
+ // Capture the kill policy.
+ if (task.has_kill_policy()) {
+ killPolicy = task.kill_policy();
+ }
+
+ // Determine the command to launch the task.
+ CommandInfo command;
+
+ if (taskCommand.isSome()) {
+ // Get CommandInfo from a JSON string.
+ Try<JSON::Object> object = JSON::parse<JSON::Object>(taskCommand.get());
+ if (object.isError()) {
+ cerr << "Failed to parse JSON: " << object.error() << endl;
+ abort();
+ }
+
+ Try<CommandInfo> parse = protobuf::parse<CommandInfo>(object.get());
+ if (parse.isError()) {
+ cerr << "Failed to parse protobuf: " << parse.error() << endl;
+ abort();
+ }
+
+ command = parse.get();
+ } else if (task.has_command()) {
+ command = task.command();
+ } else {
+ CHECK_SOME(override)
+ << "Expecting task '" << task.task_id()
+ << "' to have a command!";
+ }
+
+ if (override.isNone()) {
+ // TODO(jieyu): For now, we just fail the executor if the task's
+ // CommandInfo is not valid. The framework will receive
+ // TASK_FAILED for the task, and will most likely find out the
+ // cause with some debugging. This is a temporary solution. A more
+ // correct solution is to perform this validation at master side.
+ if (command.shell()) {
+ CHECK(command.has_value())
+ << "Shell command of task '" << task.task_id()
+ << "' is not specified!";
+ } else {
+ CHECK(command.has_value())
+ << "Executable of task '" << task.task_id()
+ << "' is not specified!";
+ }
+ }
+
+ cout << "Starting task " << task.task_id() << endl;
+
+ // TODO(benh): Clean this up with the new 'Fork' abstraction.
+ // Use pipes to determine which child has successfully changed
+ // session. This is needed as the setsid call can fail from other
+ // processes having the same group id.
+ int pipes[2];
+ if (pipe(pipes) < 0) {
+ perror("Failed to create a pipe");
+ abort();
+ }
+
+ // Set the FD_CLOEXEC flags on these pipes.
+ Try<Nothing> cloexec = os::cloexec(pipes[0]);
+ if (cloexec.isError()) {
+ cerr << "Failed to cloexec(pipe[0]): " << cloexec.error() << endl;
+ abort();
+ }
+
+ cloexec = os::cloexec(pipes[1]);
+ if (cloexec.isError()) {
+ cerr << "Failed to cloexec(pipe[1]): " << cloexec.error() << endl;
+ abort();
+ }
+
+ Option<string> rootfs;
+ if (sandboxDirectory.isSome()) {
+ // If 'sandbox_diretory' is specified, that means the user
+ // task specifies a root filesystem, and that root filesystem has
+ // already been prepared at COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH.
+ // The command executor is responsible for mounting the sandbox
+ // into the root filesystem, chrooting into it and changing the
+ // user before exec-ing the user process.
+ //
+ // TODO(gilbert): Consider a better way to detect if a root
+ // filesystem is specified for the command task.
+#ifdef __linux__
+ Result<string> user = os::user();
+ if (user.isError()) {
+ cerr << "Failed to get current user: " << user.error() << endl;
+ abort();
+ } else if (user.isNone()) {
+ cerr << "Current username is not found" << endl;
+ abort();
+ } else if (user.get() != "root") {
+ cerr << "The command executor requires root with rootfs" << endl;
+ abort();
+ }
+
+ rootfs = path::join(
+ os::getcwd(), COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH);
+
+ string sandbox = path::join(rootfs.get(), sandboxDirectory.get());
+ if (!os::exists(sandbox)) {
+ Try<Nothing> mkdir = os::mkdir(sandbox);
+ if (mkdir.isError()) {
+ cerr << "Failed to create sandbox mount point at '"
+ << sandbox << "': " << mkdir.error() << endl;
+ abort();
+ }
+ }
+
+ // Mount the sandbox into the container rootfs.
+ // We need to perform a recursive mount because we want all the
+ // volume mounts in the sandbox to be also mounted in the container
+ // root filesystem. However, since the container root filesystem
+ // is also mounted in the sandbox, after the recursive mount we
+ // also need to unmount the root filesystem in the mounted sandbox.
+ Try<Nothing> mount = fs::mount(
+ os::getcwd(),
+ sandbox,
+ None(),
+ MS_BIND | MS_REC,
+ NULL);
+
+ if (mount.isError()) {
+ cerr << "Unable to mount the work directory into container "
+ << "rootfs: " << mount.error() << endl;;
+ abort();
+ }
+
+ // Umount the root filesystem path in the mounted sandbox after
+ // the recursive mount.
+ Try<Nothing> unmountAll = fs::unmountAll(path::join(
+ sandbox,
+ COMMAND_EXECUTOR_ROOTFS_CONTAINER_PATH));
+ if (unmountAll.isError()) {
+ cerr << "Unable to unmount rootfs under mounted sandbox: "
+ << unmountAll.error() << endl;
+ abort();
+ }
+#else
+ cerr << "Not expecting root volume with non-linux platform." << endl;
+ abort();
+#endif // __linux__
+ }
+
+ // Prepare the argv before fork as it's not async signal safe.
+ char **argv = new char*[command.arguments().size() + 1];
+ for (int i = 0; i < command.arguments().size(); i++) {
+ argv[i] = (char*) command.arguments(i).c_str();
+ }
+ argv[command.arguments().size()] = NULL;
+
+ // Prepare the command log message.
+ string commandString;
+ if (override.isSome()) {
+ char** argv = override.get();
+ // argv is guaranteed to be NULL terminated and we rely on
+ // that fact to print command to be executed.
+ for (int i = 0; argv[i] != NULL; i++) {
+ commandString += string(argv[i]) + " ";
+ }
+ } else if (command.shell()) {
+ commandString = "sh -c '" + command.value() + "'";
+ } else {
+ commandString =
+ "[" + command.value() + ", " +
+ strings::join(", ", command.arguments()) + "]";
+ }
+
+ if ((pid = fork()) == -1) {
+ cerr << "Failed to fork to run " << commandString << ": "
+ << os::strerror(errno) << endl;
+ abort();
+ }
+
+ // TODO(jieyu): Make the child process async signal safe.
+ if (pid == 0) {
+ // In child process, we make cleanup easier by putting process
+ // into it's own session.
+ os::close(pipes[0]);
+
+ // NOTE: We setsid() in a loop because setsid() might fail if another
+ // process has the same process group id as the calling process.
+ while ((pid = setsid()) == -1) {
+ perror("Could not put command in its own session, setsid");
+
+ cout << "Forking another process and retrying" << endl;
+
+ if ((pid = fork()) == -1) {
+ perror("Failed to fork to launch command");
+ abort();
+ }
+
+ if (pid > 0) {
+ // In parent process. It is ok to suicide here, because
+ // we're not watching this process.
+ exit(0);
+ }
+ }
+
+ if (write(pipes[1], &pid, sizeof(pid)) != sizeof(pid)) {
+ perror("Failed to write PID on pipe");
+ abort();
+ }
+
+ os::close(pipes[1]);
+
+ if (rootfs.isSome()) {
+#ifdef __linux__
+ if (user.isSome()) {
+ // This is a work around to fix the problem that after we chroot
+ // os::su call afterwards failed because the linker may not be
+ // able to find the necessary library in the rootfs.
+ // We call os::su before chroot here to force the linker to load
+ // into memory.
+ // We also assume it's safe to su to "root" user since
+ // filesystem/linux.cpp checks for root already.
+ os::su("root");
+ }
+
+ Try<Nothing> chroot = fs::chroot::enter(rootfs.get());
+ if (chroot.isError()) {
+ cerr << "Failed to enter chroot '" << rootfs.get()
+ << "': " << chroot.error() << endl;;
+ abort();
+ }
+
+ // Determine the current working directory for the executor.
+ string cwd;
+ if (workingDirectory.isSome()) {
+ cwd = workingDirectory.get();
+ } else {
+ CHECK_SOME(sandboxDirectory);
+ cwd = sandboxDirectory.get();
+ }
+
+ Try<Nothing> chdir = os::chdir(cwd);
+ if (chdir.isError()) {
+ cerr << "Failed to chdir into current working directory '"
+ << cwd << "': " << chdir.error() << endl;
+ abort();
+ }
+
+ if (user.isSome()) {
+ Try<Nothing> su = os::su(user.get());
+ if (su.isError()) {
+ cerr << "Failed to change user to '" << user.get() << "': "
+ << su.error() << endl;
+ abort();
+ }
+ }
+#else
+ cerr << "Rootfs is only supported on Linux" << endl;
+ abort();
+#endif // __linux__
+ }
+
+ cout << commandString << endl;
+
+ // The child has successfully setsid, now run the command.
+ if (override.isNone()) {
+ if (command.shell()) {
+ execlp(
+ "sh",
+ "sh",
+ "-c",
+ command.value().c_str(),
+ (char*) NULL);
+ } else {
+ execvp(command.value().c_str(), argv);
+ }
+ } else {
+ char** argv = override.get();
+ execvp(argv[0], argv);
+ }
+
+ perror("Failed to exec");
+ abort();
+ }
+
+ delete[] argv;
+
+ // In parent process.
+ os::close(pipes[1]);
+
+ // Get the child's pid via the pipe.
+ if (read(pipes[0], &pid, sizeof(pid)) == -1) {
+ cerr << "Failed to get child PID from pipe, read: "
+ << os::strerror(errno) << endl;
+ abort();
+ }
+
+ os::close(pipes[0]);
+
+ cout << "Forked command at " << pid << endl;
+
+ if (task.has_health_check()) {
+ launchHealthCheck(task);
+ }
+
+ // Monitor this process.
+ process::reap(pid)
+ .onAny(defer(self(), &Self::reaped, driver, pid, lambda::_1));
+
+ TaskStatus status;
+ status.mutable_task_id()->MergeFrom(task.task_id());
+ status.set_state(TASK_RUNNING);
+ driver->sendStatusUpdate(status);
+
+ launched = true;
+ }
+
+ void killTask(ExecutorDriver* driver, const TaskID& taskId)
+ {
+ cout << "Received killTask for task " << taskId.value() << endl;
+
+ // Default grace period is set to 3s for backwards compatibility.
+ //
+ // TODO(alexr): Replace it with a more meaningful default, e.g.
+ // `shutdownGracePeriod` after the deprecation cycle, started in 0.29.
+ Duration gracePeriod = Seconds(3);
+
+ if (killPolicy.isSome() && killPolicy->has_grace_period()) {
+ gracePeriod = Nanoseconds(killPolicy->grace_period().nanoseconds());
+ }
+
+ killTask(driver, taskId, gracePeriod);
+ }
+
+ void frameworkMessage(ExecutorDriver* driver, const string& data) {}
+
+ void shutdown(ExecutorDriver* driver)
+ {
+ cout << "Shutting down" << endl;
+
+ // NOTE: We leave a small buffer of time to do the forced kill, otherwise
+ // the agent may destroy the container before we can send `TASK_KILLED`.
+ //
+ // TODO(alexr): Remove `MAX_REAP_INTERVAL` once the reaper signals
+ // immediately after the watched process has exited.
+ Duration gracePeriod =
+ shutdownGracePeriod - process::MAX_REAP_INTERVAL() - Seconds(1);
+
+ // Since the command executor manages a single task,
+ // shutdown boils down to killing this task.
+ //
+ // TODO(bmahler): If a shutdown arrives after a kill task within
+ // the grace period of the `KillPolicy`, we may need to escalate
+ // more quickly (e.g. the shutdown grace period allotted by the
+ // agent is smaller than the kill grace period).
+ if (launched) {
+ CHECK_SOME(taskId);
+ killTask(driver, taskId.get(), gracePeriod);
+ } else {
+ driver->stop();
+ }
+ }
+
+ virtual void error(ExecutorDriver* driver, const string& message) {}
+
+protected:
+ virtual void initialize()
+ {
+ install<TaskHealthStatus>(
+ &CommandExecutorProcess::taskHealthUpdated,
+ &TaskHealthStatus::task_id,
+ &TaskHealthStatus::healthy,
+ &TaskHealthStatus::kill_task);
+ }
+
+ void taskHealthUpdated(
+ const TaskID& taskID,
+ const bool& healthy,
+ const bool& initiateTaskKill)
+ {
+ if (driver.isNone()) {
+ return;
+ }
+
+ cout << "Received task health update, healthy: "
+ << stringify(healthy) << endl;
+
+ TaskStatus status;
+ status.mutable_task_id()->CopyFrom(taskID);
+ status.set_healthy(healthy);
+ status.set_state(TASK_RUNNING);
+ driver.get()->sendStatusUpdate(status);
+
+ if (initiateTaskKill) {
+ killedByHealthCheck = true;
+ killTask(driver.get(), taskID);
+ }
+ }
+
+private:
+ void killTask(
+ ExecutorDriver* driver,
+ const TaskID& _taskId,
+ const Duration& gracePeriod)
+ {
+ if (launched && !killed) {
+ // Send TASK_KILLING if the framework can handle it.
+ CHECK_SOME(frameworkInfo);
+ CHECK_SOME(taskId);
+ CHECK(taskId.get() == _taskId);
+
+ foreach (const FrameworkInfo::Capability& c,
+ frameworkInfo->capabilities()) {
+ if (c.type() == FrameworkInfo::Capability::TASK_KILLING_STATE) {
+ TaskStatus status;
+ status.mutable_task_id()->CopyFrom(taskId.get());
+ status.set_state(TASK_KILLING);
+ driver->sendStatusUpdate(status);
+ break;
+ }
+ }
+
+ // Now perform signal escalation to begin killing the task.
+ CHECK_GT(pid, 0);
+
+ cout << "Sending SIGTERM to process tree at pid " << pid << endl;
+
+ Try<std::list<os::ProcessTree> > trees =
+ os::killtree(pid, SIGTERM, true, true);
+
+ if (trees.isError()) {
+ cerr << "Failed to kill the process tree rooted at pid " << pid
+ << ": " << trees.error() << endl;
+
+ // Send SIGTERM directly to process 'pid' as it may not have
+ // received signal before os::killtree() failed.
+ ::kill(pid, SIGTERM);
+ } else {
+ cout << "Sent SIGTERM to the following process trees:\n"
+ << stringify(trees.get()) << endl;
+ }
+
+ escalationTimer =
+ delay(gracePeriod, self(), &Self::escalated, gracePeriod);
+
+ killed = true;
+ }
+
+ // Cleanup health check process.
+ //
+ // TODO(bmahler): Consider doing this after the task has been
+ // reaped, since a framework may be interested in health
+ // information while the task is being killed (consider a
+ // task that takes 30 minutes to be cleanly killed).
+ if (healthPid != -1) {
+ os::killtree(healthPid, SIGKILL);
+ }
+ }
+
+ void reaped(
+ ExecutorDriver* driver,
+ pid_t pid,
+ const Future<Option<int> >& status_)
+ {
+ TaskState taskState;
+ string message;
+
+ Clock::cancel(escalationTimer);
+
+ if (!status_.isReady()) {
+ taskState = TASK_FAILED;
+ message =
+ "Failed to get exit status for Command: " +
+ (status_.isFailed() ? status_.failure() : "future discarded");
+ } else if (status_.get().isNone()) {
+ taskState = TASK_FAILED;
+ message = "Failed to get exit status for Command";
+ } else {
+ int status = status_.get().get();
+ CHECK(WIFEXITED(status) || WIFSIGNALED(status)) << status;
+
+ if (WIFEXITED(status) && WEXITSTATUS(status) == 0) {
+ taskState = TASK_FINISHED;
+ } else if (killed) {
+ // Send TASK_KILLED if the task was killed as a result of
+ // killTask() or shutdown().
+ taskState = TASK_KILLED;
+ } else {
+ taskState = TASK_FAILED;
+ }
+
+ message = "Command " + WSTRINGIFY(status);
+ }
+
+ cout << message << " (pid: " << pid << ")" << endl;
+
+ CHECK_SOME(taskId);
+
+ TaskStatus taskStatus;
+ taskStatus.mutable_task_id()->MergeFrom(taskId.get());
+ taskStatus.set_state(taskState);
+ taskStatus.set_message(message);
+ if (killed && killedByHealthCheck) {
+ taskStatus.set_healthy(false);
+ }
+
+ driver->sendStatusUpdate(taskStatus);
+
+ // This is a hack to ensure the message is sent to the
+ // slave before we exit the process. Without this, we
+ // may exit before libprocess has sent the data over
+ // the socket. See MESOS-4111.
+ os::sleep(Seconds(1));
+ driver->stop();
+ }
+
+ void escalated(Duration timeout)
+ {
+ cout << "Process " << pid << " did not terminate after " << timeout
+ << ", sending SIGKILL to process tree at " << pid << endl;
+
+ // TODO(nnielsen): Sending SIGTERM in the first stage of the
+ // shutdown may leave orphan processes hanging off init. This
+ // scenario will be handled when PID namespace encapsulated
+ // execution is in place.
+ Try<std::list<os::ProcessTree> > trees =
+ os::killtree(pid, SIGKILL, true, true);
+
+ if (trees.isError()) {
+ cerr << "Failed to kill the process tree rooted at pid "
+ << pid << ": " << trees.error() << endl;
+
+ // Process 'pid' may not have received signal before
+ // os::killtree() failed. To make sure process 'pid' is reaped
+ // we send SIGKILL directly.
+ ::kill(pid, SIGKILL);
+ } else {
+ cout << "Killed the following process trees:\n" << stringify(trees.get())
+ << endl;
+ }
+ }
+
+ void launchHealthCheck(const TaskInfo& task)
+ {
+ CHECK(task.has_health_check());
+
+ JSON::Object json = JSON::protobuf(task.health_check());
+
+ // Launch the subprocess using 'exec' style so that quotes can
+ // be properly handled.
+ vector<string> argv(4);
+ argv[0] = "mesos-health-check";
+ argv[1] = "--executor=" + stringify(self());
+ argv[2] = "--health_check_json=" + stringify(json);
+ argv[3] = "--task_id=" + task.task_id().value();
+
+ cout << "Launching health check process: "
+ << path::join(healthCheckDir, "mesos-health-check")
+ << " " << argv[1] << " " << argv[2] << " " << argv[3] << endl;
+
+ Try<Subprocess> healthProcess =
+ process::subprocess(
+ path::join(healthCheckDir, "mesos-health-check"),
+ argv,
+ // Intentionally not sending STDIN to avoid health check
+ // commands that expect STDIN input to block.
+ Subprocess::PATH("/dev/null"),
+ Subprocess::FD(STDOUT_FILENO),
+ Subprocess::FD(STDERR_FILENO));
+
+ if (healthProcess.isError()) {
+ cerr << "Unable to launch health process: " << healthProcess.error();
+ return;
+ }
+
+ healthPid = healthProcess.get().pid();
+
+ cout << "Health check process launched at pid: "
+ << stringify(healthPid) << endl;
+ }
+
+ enum State
+ {
+ REGISTERING, // Executor is launched but not (re-)registered yet.
+ REGISTERED, // Executor has (re-)registered.
+ } state;
+
+ bool launched;
+ bool killed;
+ bool killedByHealthCheck;
+ pid_t pid;
+ pid_t healthPid;
+ Duration shutdownGracePeriod;
+ Option<KillPolicy> killPolicy;
+ Timer escalationTimer;
+ Option<ExecutorDriver*> driver;
+ Option<FrameworkInfo> frameworkInfo;
+ Option<TaskID> taskId;
+ string healthCheckDir;
+ Option<char**> override;
+ Option<string> sandboxDirectory;
+ Option<string> workingDirectory;
+ Option<string> user;
+ Option<string> taskCommand;
+};
+
+
+class CommandExecutor: public Executor
+{
+public:
+ CommandExecutor(
+ const Option<char**>& override,
+ const string& healthCheckDir,
+ const Option<string>& sandboxDirectory,
+ const Option<string>& workingDirectory,
+ const Option<string>& user,
+ const Option<string>& taskCommand,
+ const Duration& shutdownGracePeriod)
+ {
+ process = new CommandExecutorProcess(
+ override,
+ healthCheckDir,
+ sandboxDirectory,
+ workingDirectory,
+ user,
+ taskCommand,
+ shutdownGracePeriod);
+
+ spawn(process);
+ }
+
+ virtual ~CommandExecutor()
+ {
+ terminate(process);
+ wait(process);
+ delete process;
+ }
+
+ virtual void registered(
+ ExecutorDriver* driver,
+ const ExecutorInfo& executorInfo,
+ const FrameworkInfo& frameworkInfo,
+ const SlaveInfo& slaveInfo)
+ {
+ dispatch(process,
+ &CommandExecutorProcess::registered,
+ driver,
+ executorInfo,
+ frameworkInfo,
+ slaveInfo);
+ }
+
+ virtual void reregistered(
+ ExecutorDriver* driver,
+ const SlaveInfo& slaveInfo)
+ {
+ dispatch(process,
+ &CommandExecutorProcess::reregistered,
+ driver,
+ slaveInfo);
+ }
+
+ virtual void disconnected(ExecutorDriver* driver)
+ {
+ dispatch(process, &CommandExecutorProcess::disconnected, driver);
+ }
+
+ virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+ {
+ dispatch(process, &CommandExecutorProcess::launchTask, driver, task);
+ }
+
+ virtual void killTask(ExecutorDriver* driver, const TaskID& taskId)
+ {
+ dispatch(process, &CommandExecutorProcess::killTask, driver, taskId);
+ }
+
+ virtual void frameworkMessage(ExecutorDriver* driver, const string& data)
+ {
+ dispatch(process, &CommandExecutorProcess::frameworkMessage, driver, data);
+ }
+
+ virtual void shutdown(ExecutorDriver* driver)
+ {
+ dispatch(process, &CommandExecutorProcess::shutdown, driver);
+ }
+
+ virtual void error(ExecutorDriver* driver, const string& data)
+ {
+ dispatch(process, &CommandExecutorProcess::error, driver, data);
+ }
+
+private:
+ CommandExecutorProcess* process;
+};
+
+} // namespace internal {
+} // namespace mesos {
+
+
+class Flags : public flags::FlagsBase
+{
+public:
+ Flags()
+ {
+ // TODO(gilbert): Deprecate the 'override' flag since no one is
+ // using it, and it may cause confusing with 'task_command' flag.
+ add(&override,
+ "override",
+ "Whether to override the command the executor should run when the\n"
+ "task is launched. Only this flag is expected to be on the command\n"
+ "line and all arguments after the flag will be used as the\n"
+ "subsequent 'argv' to be used with 'execvp'",
+ false);
+
+ // The following flags are only applicable when a rootfs is
+ // provisioned for this command.
+ add(&sandbox_directory,
+ "sandbox_directory",
+ "The absolute path for the directory in the container where the\n"
+ "sandbox is mapped to");
+
+ add(&working_directory,
+ "working_directory",
+ "The working directory for the task in the container.");
+
+ add(&user,
+ "user",
+ "The user that the task should be running as.");
+
+ add(&task_command,
+ "task_command",
+ "If specified, this is the overrided command for launching the\n"
+ "task (instead of the command from TaskInfo).");
+
+ // TODO(nnielsen): Add 'prefix' option to enable replacing
+ // 'sh -c' with user specified wrapper.
+ }
+
+ bool override;
+ Option<string> sandbox_directory;
+ Option<string> working_directory;
+ Option<string> user;
+ Option<string> task_command;
+};
+
+
+int main(int argc, char** argv)
+{
+ Flags flags;
+
+ // Load flags from command line.
+ Try<Nothing> load = flags.load(None(), &argc, &argv);
+
+ if (load.isError()) {
+ cerr << flags.usage(load.error()) << endl;
+ return EXIT_FAILURE;
+ }
+
+ if (flags.help) {
+ cout << flags.usage() << endl;
+ return EXIT_SUCCESS;
+ }
+
+ // After flags.load(..., &argc, &argv) all flags will have been
+ // stripped from argv. Additionally, arguments after a "--"
+ // terminator will be preservered in argv and it is therefore
+ // possible to pass override and prefix commands which use
+ // "--foobar" style flags.
+ Option<char**> override = None();
+ if (flags.override) {
+ if (argc > 1) {
+ override = argv + 1;
+ }
+ }
+
+ const Option<string> envPath = os::getenv("MESOS_LAUNCHER_DIR");
+
+ string path = envPath.isSome()
+ ? envPath.get()
+ : os::realpath(Path(argv[0]).dirname()).get();
+
+ // Get executor shutdown grace period from the environment.
+ //
+ // NOTE: We avoided introducing a command executor flag for this
+ // because the command executor exits if it sees an unknown flag.
+ // This makes it difficult to add or remove command executor flags
+ // that are unconditionally set by the agent.
+ Duration shutdownGracePeriod = DEFAULT_EXECUTOR_SHUTDOWN_GRACE_PERIOD;
+ Option<string> value = os::getenv("MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD");
+ if (value.isSome()) {
+ Try<Duration> parse = Duration::parse(value.get());
+ if (parse.isError()) {
+ cerr << "Failed to parse value '" << value.get() << "'"
+ << " of 'MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD': " << parse.error();
+ return EXIT_FAILURE;
+ }
+
+ shutdownGracePeriod = parse.get();
+ }
+
+ mesos::internal::CommandExecutor executor(
+ override,
+ path,
+ flags.sandbox_directory,
+ flags.working_directory,
+ flags.user,
+ flags.task_command,
+ shutdownGracePeriod);
+
+ mesos::MesosExecutorDriver driver(&executor);
+
+ return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
+}