You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by vi...@apache.org on 2016/04/14 01:27:49 UTC
[2/4] mesos git commit: Updated http_command_executor.cpp to use v1
API.
Updated http_command_executor.cpp to use v1 API.
Review: https://reviews.apache.org/r/44424/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bec4d711
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bec4d711
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bec4d711
Branch: refs/heads/master
Commit: bec4d711c3fc190138d100023bba9e3fe087052e
Parents: ed30403
Author: Qian Zhang <zh...@cn.ibm.com>
Authored: Wed Apr 13 15:55:25 2016 -0700
Committer: Vinod Kone <vi...@gmail.com>
Committed: Wed Apr 13 16:27:39 2016 -0700
----------------------------------------------------------------------
src/launcher/http_command_executor.cpp | 539 +++++++++++++++-------------
1 file changed, 295 insertions(+), 244 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/bec4d711/src/launcher/http_command_executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/http_command_executor.cpp b/src/launcher/http_command_executor.cpp
index 7677391..ad484e0 100644
--- a/src/launcher/http_command_executor.cpp
+++ b/src/launcher/http_command_executor.cpp
@@ -24,7 +24,9 @@
#include <string>
#include <vector>
-#include <mesos/executor.hpp>
+#include <mesos/v1/executor.hpp>
+#include <mesos/v1/mesos.hpp>
+
#include <mesos/type_utils.hpp>
#include <process/defer.hpp>
@@ -41,6 +43,7 @@
#include <stout/flags.hpp>
#include <stout/json.hpp>
#include <stout/lambda.hpp>
+#include <stout/linkedhashmap.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
@@ -50,6 +53,8 @@
#include "common/http.hpp"
#include "common/status_utils.hpp"
+#include "internal/evolve.hpp"
+
#ifdef __linux__
#include "linux/fs.hpp"
#endif
@@ -60,101 +65,230 @@
#include "slave/constants.hpp"
-using namespace mesos::internal::slave;
+#ifdef __linux__
+namespace fs = mesos::internal::fs;
+#endif
-using process::wait; // Necessary on some OS's to disambiguate.
+using namespace mesos::internal::slave;
using std::cout;
using std::cerr;
using std::endl;
+using std::queue;
using std::string;
using std::vector;
+using process::Clock;
+using process::Future;
+using process::Owned;
+using process::Subprocess;
+using process::Timer;
+
+using mesos::internal::evolve;
+using mesos::internal::TaskHealthStatus;
+
+using mesos::v1::ExecutorID;
+using mesos::v1::FrameworkID;
+
+using mesos::v1::executor::Call;
+using mesos::v1::executor::Event;
+using mesos::v1::executor::Mesos;
+
namespace mesos {
+namespace v1 {
namespace internal {
-using namespace process;
-
-class CommandExecutorProcess : public ProtobufProcess<CommandExecutorProcess>
+class HttpCommandExecutor: public ProtobufProcess<HttpCommandExecutor>
{
public:
- CommandExecutorProcess(
- const Option<char**>& override,
+ HttpCommandExecutor(
+ const Option<char**>& _override,
const string& _healthCheckDir,
const Option<string>& _sandboxDirectory,
const Option<string>& _workingDirectory,
const Option<string>& _user,
const Option<string>& _taskCommand,
+ const FrameworkID& _frameworkId,
+ const ExecutorID& _executorId,
const Duration& _shutdownGracePeriod)
- : state(REGISTERING),
- launched(false),
- killed(false),
- killedByHealthCheck(false),
- pid(-1),
- healthPid(-1),
- shutdownGracePeriod(_shutdownGracePeriod),
- driver(None()),
- frameworkInfo(None()),
- taskId(None()),
- healthCheckDir(_healthCheckDir),
- override(override),
- sandboxDirectory(_sandboxDirectory),
- workingDirectory(_workingDirectory),
- user(_user),
- taskCommand(_taskCommand) {}
-
- virtual ~CommandExecutorProcess() {}
-
- void registered(
- ExecutorDriver* _driver,
- const ExecutorInfo& _executorInfo,
- const FrameworkInfo& _frameworkInfo,
- const SlaveInfo& slaveInfo)
+ : state(DISCONNECTED),
+ launched(false),
+ killed(false),
+ killedByHealthCheck(false),
+ pid(-1),
+ healthPid(-1),
+ shutdownGracePeriod(_shutdownGracePeriod),
+ frameworkInfo(None()),
+ taskId(None()),
+ healthCheckDir(_healthCheckDir),
+ override(_override),
+ sandboxDirectory(_sandboxDirectory),
+ workingDirectory(_workingDirectory),
+ user(_user),
+ taskCommand(_taskCommand),
+ frameworkId(_frameworkId),
+ executorId(_executorId),
+ task(None()) {}
+
+ virtual ~HttpCommandExecutor() = default;
+
+ void connected()
{
- CHECK_EQ(REGISTERING, state);
+ state = CONNECTED;
+
+ doReliableRegistration();
+ }
+
+ void disconnected()
+ {
+ state = DISCONNECTED;
+ }
+
+ void received(queue<Event> events)
+ {
+ while (!events.empty()) {
+ Event event = events.front();
+ events.pop();
+
+ cout << "Received " << event.type() << " event" << endl;
+
+ switch (event.type()) {
+ case Event::SUBSCRIBED: {
+ cout << "Subscribed executor on "
+ << event.subscribed().agent_info().hostname() << endl;
+
+ frameworkInfo = event.subscribed().framework_info();
+ state = SUBSCRIBED;
+ break;
+ }
- cout << "Registered executor on " << slaveInfo.hostname() << endl;
+ case Event::LAUNCH: {
+ launch(event.launch().task());
+ break;
+ }
- driver = _driver;
- frameworkInfo = _frameworkInfo;
+ case Event::KILL: {
+ kill(event.kill().task_id());
+ break;
+ }
- state = REGISTERED;
+ case Event::ACKNOWLEDGED: {
+ // Remove the corresponding update.
+ updates.erase(UUID::fromBytes(event.acknowledged().uuid()));
+
+ // Remove the corresponding task.
+ task = None();
+ break;
+ }
+
+ case Event::SHUTDOWN: {
+ shutdown();
+ break;
+ }
+
+ case Event::MESSAGE: {
+ break;
+ }
+
+ case Event::ERROR: {
+ cerr << "Error: " << event.error().message() << endl;
+ }
+
+ default: {
+ UNREACHABLE();
+ }
+ }
+ }
}
- void reregistered(
- ExecutorDriver* driver,
- const SlaveInfo& slaveInfo)
+protected:
+ virtual void initialize()
{
- CHECK(state == REGISTERED || state == REGISTERING) << state;
+ // TODO(qianzhang): Currently, the `mesos-health-check` binary can only
+ // send unversioned `TaskHealthStatus` messages. This needs to be revisited
+ // as part of MESOS-5103.
+ install<TaskHealthStatus>(
+ &HttpCommandExecutor::taskHealthUpdated,
+ &TaskHealthStatus::task_id,
+ &TaskHealthStatus::healthy,
+ &TaskHealthStatus::kill_task);
- cout << "Re-registered executor on " << slaveInfo.hostname() << endl;
+ // We initialize the library here to ensure that callbacks are only invoked
+ // after the process has spawned.
+ mesos.reset(new Mesos(
+ mesos::ContentType::PROTOBUF,
+ defer(self(), &Self::connected),
+ defer(self(), &Self::disconnected),
+ defer(self(), &Self::received, lambda::_1)));
+ }
- state = REGISTERED;
+ void taskHealthUpdated(
+ const mesos::TaskID& taskID,
+ const bool healthy,
+ const bool initiateTaskKill)
+ {
+ cout << "Received task health update, healthy: "
+ << stringify(healthy) << endl;
+
+ update(evolve(taskID), TASK_RUNNING, healthy);
+
+ if (initiateTaskKill) {
+ killedByHealthCheck = true;
+ kill(evolve(taskID));
+ }
}
- void disconnected(ExecutorDriver* driver) {}
+ void doReliableRegistration()
+ {
+ if (state == SUBSCRIBED || state == DISCONNECTED) {
+ return;
+ }
+
+ Call call;
+ call.set_type(Call::SUBSCRIBE);
+
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.mutable_executor_id()->CopyFrom(executorId);
- void launchTask(ExecutorDriver* driver, const TaskInfo& task)
+ Call::Subscribe* subscribe = call.mutable_subscribe();
+
+ // Send all unacknowledged updates.
+ foreach (const Call::Update& update, updates.values()) {
+ subscribe->add_unacknowledged_updates()->MergeFrom(update);
+ }
+
+ // Send the unacknowledged task.
+ if (task.isSome()) {
+ subscribe->add_unacknowledged_tasks()->MergeFrom(task.get());
+ }
+
+ mesos->send(call);
+
+ delay(Seconds(1), self(), &Self::doReliableRegistration);
+ }
+
+ void launch(const TaskInfo& _task)
{
- CHECK_EQ(REGISTERED, state);
+ CHECK_EQ(SUBSCRIBED, state);
if (launched) {
- TaskStatus status;
- status.mutable_task_id()->MergeFrom(task.task_id());
- status.set_state(TASK_FAILED);
- status.set_message(
+ update(
+ _task.task_id(),
+ TASK_FAILED,
+ None(),
"Attempted to run multiple tasks using a \"command\" executor");
-
- driver->sendStatusUpdate(status);
return;
}
+ // Capture the task.
+ task = _task;
+
// Capture the TaskID.
- taskId = task.task_id();
+ taskId = task->task_id();
// Capture the kill policy.
- if (task.has_kill_policy()) {
- killPolicy = task.kill_policy();
+ if (task->has_kill_policy()) {
+ killPolicy = task->kill_policy();
}
// Determine the command to launch the task.
@@ -175,11 +309,11 @@ public:
}
command = parse.get();
- } else if (task.has_command()) {
- command = task.command();
+ } else if (task->has_command()) {
+ command = task->command();
} else {
CHECK_SOME(override)
- << "Expecting task '" << task.task_id()
+ << "Expecting task '" << task->task_id()
<< "' to have a command!";
}
@@ -191,16 +325,16 @@ public:
// correct solution is to perform this validation at master side.
if (command.shell()) {
CHECK(command.has_value())
- << "Shell command of task '" << task.task_id()
+ << "Shell command of task '" << task->task_id()
<< "' is not specified!";
} else {
CHECK(command.has_value())
- << "Executable of task '" << task.task_id()
+ << "Executable of task '" << task->task_id()
<< "' is not specified!";
}
}
- cout << "Starting task " << task.task_id() << endl;
+ cout << "Starting task " << task->task_id() << endl;
// TODO(benh): Clean this up with the new 'Fork' abstraction.
// Use pipes to determine which child has successfully changed
@@ -448,25 +582,22 @@ public:
cout << "Forked command at " << pid << endl;
- if (task.has_health_check()) {
- launchHealthCheck(task);
+ if (task->has_health_check()) {
+ launchHealthCheck(task.get());
}
// Monitor this process.
process::reap(pid)
- .onAny(defer(self(), &Self::reaped, driver, pid, lambda::_1));
+ .onAny(defer(self(), &Self::reaped, pid, lambda::_1));
- TaskStatus status;
- status.mutable_task_id()->MergeFrom(task.task_id());
- status.set_state(TASK_RUNNING);
- driver->sendStatusUpdate(status);
+ update(task->task_id(), TASK_RUNNING);
launched = true;
}
- void killTask(ExecutorDriver* driver, const TaskID& taskId)
+ void kill(const TaskID& taskId)
{
- cout << "Received killTask for task " << taskId.value() << endl;
+ cout << "Received kill for task " << taskId.value() << endl;
// Default grace period is set to 3s for backwards compatibility.
//
@@ -478,12 +609,10 @@ public:
gracePeriod = Nanoseconds(killPolicy->grace_period().nanoseconds());
}
- killTask(driver, taskId, gracePeriod);
+ kill(taskId, gracePeriod);
}
- void frameworkMessage(ExecutorDriver* driver, const string& data) {}
-
- void shutdown(ExecutorDriver* driver)
+ void shutdown()
{
cout << "Shutting down" << endl;
@@ -504,53 +633,12 @@ public:
// agent is smaller than the kill grace period).
if (launched) {
CHECK_SOME(taskId);
- killTask(driver, taskId.get(), gracePeriod);
- } else {
- driver->stop();
- }
- }
-
- virtual void error(ExecutorDriver* driver, const string& message) {}
-
-protected:
- virtual void initialize()
- {
- install<TaskHealthStatus>(
- &CommandExecutorProcess::taskHealthUpdated,
- &TaskHealthStatus::task_id,
- &TaskHealthStatus::healthy,
- &TaskHealthStatus::kill_task);
- }
-
- void taskHealthUpdated(
- const TaskID& taskID,
- const bool& healthy,
- const bool& initiateTaskKill)
- {
- if (driver.isNone()) {
- return;
- }
-
- cout << "Received task health update, healthy: "
- << stringify(healthy) << endl;
-
- TaskStatus status;
- status.mutable_task_id()->CopyFrom(taskID);
- status.set_healthy(healthy);
- status.set_state(TASK_RUNNING);
- driver.get()->sendStatusUpdate(status);
-
- if (initiateTaskKill) {
- killedByHealthCheck = true;
- killTask(driver.get(), taskID);
+ kill(taskId.get(), gracePeriod);
}
}
private:
- void killTask(
- ExecutorDriver* driver,
- const TaskID& _taskId,
- const Duration& gracePeriod)
+ void kill(const TaskID& _taskId, const Duration& gracePeriod)
{
if (launched && !killed) {
// Send TASK_KILLING if the framework can handle it.
@@ -561,10 +649,7 @@ private:
foreach (const FrameworkInfo::Capability& c,
frameworkInfo->capabilities()) {
if (c.type() == FrameworkInfo::Capability::TASK_KILLING_STATE) {
- TaskStatus status;
- status.mutable_task_id()->CopyFrom(taskId.get());
- status.set_state(TASK_KILLING);
- driver->sendStatusUpdate(status);
+ update(taskId.get(), TASK_KILLING);
break;
}
}
@@ -606,10 +691,7 @@ private:
}
}
- void reaped(
- ExecutorDriver* driver,
- pid_t pid,
- const Future<Option<int> >& status_)
+ void reaped(pid_t pid, const Future<Option<int> >& status_)
{
TaskState taskState;
string message;
@@ -632,7 +714,7 @@ private:
taskState = TASK_FINISHED;
} else if (killed) {
// Send TASK_KILLED if the task was killed as a result of
- // killTask() or shutdown().
+ // kill() or shutdown().
taskState = TASK_KILLED;
} else {
taskState = TASK_FAILED;
@@ -645,22 +727,17 @@ private:
CHECK_SOME(taskId);
- TaskStatus taskStatus;
- taskStatus.mutable_task_id()->MergeFrom(taskId.get());
- taskStatus.set_state(taskState);
- taskStatus.set_message(message);
if (killed && killedByHealthCheck) {
- taskStatus.set_healthy(false);
+ update(taskId.get(), taskState, false, message);
+ } else {
+ update(taskId.get(), taskState, None(), message);
}
- driver->sendStatusUpdate(taskStatus);
-
- // This is a hack to ensure the message is sent to the
- // slave before we exit the process. Without this, we
- // may exit before libprocess has sent the data over
- // the socket. See MESOS-4111.
+ // TODO(qianzhang): Remove this hack since the executor now receives
+ // acknowledgements for status updates. The executor can terminate
+ // after it receives an ACK for a terminal status update.
os::sleep(Seconds(1));
- driver->stop();
+ terminate(self());
}
void escalated(Duration timeout)
@@ -728,10 +805,49 @@ private:
<< stringify(healthPid) << endl;
}
+ void update(
+ const TaskID& taskID,
+ const TaskState& state,
+ const Option<bool>& healthy = None(),
+ const Option<string>& message = None())
+ {
+ UUID uuid = UUID::random();
+
+ TaskStatus status;
+ status.mutable_task_id()->CopyFrom(taskID);
+ status.mutable_executor_id()->CopyFrom(executorId);
+
+ status.set_state(state);
+ status.set_source(TaskStatus::SOURCE_EXECUTOR);
+ status.set_uuid(uuid.toBytes());
+
+ if (healthy.isSome()) {
+ status.set_healthy(healthy.get());
+ }
+
+ if (message.isSome()) {
+ status.set_message(message.get());
+ }
+
+ Call call;
+ call.set_type(Call::UPDATE);
+
+ call.mutable_framework_id()->CopyFrom(frameworkId);
+ call.mutable_executor_id()->CopyFrom(executorId);
+
+ call.mutable_update()->mutable_status()->CopyFrom(status);
+
+ // Capture the status update.
+ updates[uuid] = call.update();
+
+ mesos->send(call);
+ }
+
enum State
{
- REGISTERING, // Executor is launched but not (re-)registered yet.
- REGISTERED, // Executor has (re-)registered.
+ CONNECTED,
+ DISCONNECTED,
+ SUBSCRIBED
} state;
bool launched;
@@ -742,7 +858,6 @@ private:
Duration shutdownGracePeriod;
Option<KillPolicy> killPolicy;
Timer escalationTimer;
- Option<ExecutorDriver*> driver;
Option<FrameworkInfo> frameworkInfo;
Option<TaskID> taskId;
string healthCheckDir;
@@ -751,99 +866,15 @@ private:
Option<string> workingDirectory;
Option<string> user;
Option<string> taskCommand;
-};
-
-
-class CommandExecutor: public Executor
-{
-public:
- CommandExecutor(
- const Option<char**>& override,
- const string& healthCheckDir,
- const Option<string>& sandboxDirectory,
- const Option<string>& workingDirectory,
- const Option<string>& user,
- const Option<string>& taskCommand,
- const Duration& shutdownGracePeriod)
- {
- process = new CommandExecutorProcess(
- override,
- healthCheckDir,
- sandboxDirectory,
- workingDirectory,
- user,
- taskCommand,
- shutdownGracePeriod);
-
- spawn(process);
- }
-
- virtual ~CommandExecutor()
- {
- terminate(process);
- wait(process);
- delete process;
- }
-
- virtual void registered(
- ExecutorDriver* driver,
- const ExecutorInfo& executorInfo,
- const FrameworkInfo& frameworkInfo,
- const SlaveInfo& slaveInfo)
- {
- dispatch(process,
- &CommandExecutorProcess::registered,
- driver,
- executorInfo,
- frameworkInfo,
- slaveInfo);
- }
-
- virtual void reregistered(
- ExecutorDriver* driver,
- const SlaveInfo& slaveInfo)
- {
- dispatch(process,
- &CommandExecutorProcess::reregistered,
- driver,
- slaveInfo);
- }
-
- virtual void disconnected(ExecutorDriver* driver)
- {
- dispatch(process, &CommandExecutorProcess::disconnected, driver);
- }
-
- virtual void launchTask(ExecutorDriver* driver, const TaskInfo& task)
- {
- dispatch(process, &CommandExecutorProcess::launchTask, driver, task);
- }
-
- virtual void killTask(ExecutorDriver* driver, const TaskID& taskId)
- {
- dispatch(process, &CommandExecutorProcess::killTask, driver, taskId);
- }
-
- virtual void frameworkMessage(ExecutorDriver* driver, const string& data)
- {
- dispatch(process, &CommandExecutorProcess::frameworkMessage, driver, data);
- }
-
- virtual void shutdown(ExecutorDriver* driver)
- {
- dispatch(process, &CommandExecutorProcess::shutdown, driver);
- }
-
- virtual void error(ExecutorDriver* driver, const string& data)
- {
- dispatch(process, &CommandExecutorProcess::error, driver, data);
- }
-
-private:
- CommandExecutorProcess* process;
+ const FrameworkID frameworkId;
+ const ExecutorID executorId;
+ Owned<Mesos> mesos;
+ LinkedHashMap<UUID, Call::Update> updates; // Unacknowledged updates.
+ Option<TaskInfo> task; // Unacknowledged task.
};
} // namespace internal {
+} // namespace v1 {
} // namespace mesos {
@@ -897,6 +928,8 @@ public:
int main(int argc, char** argv)
{
Flags flags;
+ FrameworkID frameworkId;
+ ExecutorID executorId;
// Load flags from command line.
Try<Nothing> load = flags.load(None(), &argc, &argv);
@@ -929,6 +962,20 @@ int main(int argc, char** argv)
? envPath.get()
: os::realpath(Path(argv[0]).dirname()).get();
+ Option<string> value = os::getenv("MESOS_FRAMEWORK_ID");
+ if (value.isNone()) {
+ EXIT(EXIT_FAILURE)
+ << "Expecting 'MESOS_FRAMEWORK_ID' to be set in the environment";
+ }
+ frameworkId.set_value(value.get());
+
+ value = os::getenv("MESOS_EXECUTOR_ID");
+ if (value.isNone()) {
+ EXIT(EXIT_FAILURE)
+ << "Expecting 'MESOS_EXECUTOR_ID' to be set in the environment";
+ }
+ executorId.set_value(value.get());
+
// Get executor shutdown grace period from the environment.
//
// NOTE: We avoided introducing a command executor flag for this
@@ -936,7 +983,7 @@ int main(int argc, char** argv)
// This makes it difficult to add or remove command executor flags
// that are unconditionally set by the agent.
Duration shutdownGracePeriod = DEFAULT_EXECUTOR_SHUTDOWN_GRACE_PERIOD;
- Option<string> value = os::getenv("MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD");
+ value = os::getenv("MESOS_EXECUTOR_SHUTDOWN_GRACE_PERIOD");
if (value.isSome()) {
Try<Duration> parse = Duration::parse(value.get());
if (parse.isError()) {
@@ -948,16 +995,20 @@ int main(int argc, char** argv)
shutdownGracePeriod = parse.get();
}
- mesos::internal::CommandExecutor executor(
- override,
- path,
- flags.sandbox_directory,
- flags.working_directory,
- flags.user,
- flags.task_command,
- shutdownGracePeriod);
-
- mesos::MesosExecutorDriver driver(&executor);
-
- return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
+ Owned<mesos::v1::internal::HttpCommandExecutor> executor(
+ new mesos::v1::internal::HttpCommandExecutor(
+ override,
+ path,
+ flags.sandbox_directory,
+ flags.working_directory,
+ flags.user,
+ flags.task_command,
+ frameworkId,
+ executorId,
+ shutdownGracePeriod));
+
+ process::spawn(executor.get());
+ process::wait(executor.get());
+
+ return EXIT_SUCCESS;
}