You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2016/02/04 23:56:21 UTC
[2/3] mesos git commit: Added a new flag to command executor for
overriding the task command.
Added a new flag to command executor for overriding the task command.
Review: https://reviews.apache.org/r/43082/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9228d91d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9228d91d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9228d91d
Branch: refs/heads/master
Commit: 9228d91d2da0d928f373bec205e0e31deeadfd37
Parents: 21224a7
Author: Gilbert Song <so...@gmail.com>
Authored: Thu Feb 4 13:48:37 2016 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Feb 4 14:56:14 2016 -0800
----------------------------------------------------------------------
src/launcher/executor.cpp | 115 ++++++++++++++++++++++++++++-------------
1 file changed, 78 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9228d91d/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 356d311..b214a3f 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -39,6 +39,7 @@
#include <stout/duration.hpp>
#include <stout/flags.hpp>
+#include <stout/json.hpp>
#include <stout/lambda.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
@@ -81,7 +82,8 @@ public:
const Option<char**>& override,
const string& _healthCheckDir,
const Option<string>& _sandboxDirectory,
- const Option<string>& _user)
+ const Option<string>& _user,
+ const Option<string>& _taskCommand)
: state(REGISTERING),
launched(false),
killed(false),
@@ -93,7 +95,8 @@ public:
healthCheckDir(_healthCheckDir),
override(override),
sandboxDirectory(_sandboxDirectory),
- user(_user) {}
+ user(_user),
+ taskCommand(_taskCommand) {}
virtual ~CommandExecutorProcess() {}
@@ -137,26 +140,46 @@ public:
return;
}
- // Skip sanity checks for TaskInfo if override is provided since
- // the executor will be running the override command.
- if (override.isNone()) {
- // Sanity checks.
- CHECK(task.has_command()) << "Expecting task " << task.task_id()
- << " to have a command!";
+ // Determine the command to launch the task.
+ CommandInfo command;
+
+ if (taskCommand.isSome()) {
+ // Get CommandInfo from a JSON string.
+ Try<JSON::Object> object = JSON::parse<JSON::Object>(taskCommand.get());
+ if (object.isError()) {
+ cerr << "Failed to parse JSON: " << object.error() << endl;
+ abort();
+ }
+
+ Try<CommandInfo> parse = protobuf::parse<CommandInfo>(object.get());
+ if (parse.isError()) {
+ cerr << "Failed to parse protobuf: " << parse.error() << endl;
+ abort();
+ }
+
+ command = parse.get();
+ } else if (task.has_command()) {
+ command = task.command();
+ } else {
+ CHECK_SOME(override)
+ << "Expecting task '" << task.task_id()
+ << "' to have a command!";
+ }
+ if (override.isNone()) {
// TODO(jieyu): For now, we just fail the executor if the task's
// CommandInfo is not valid. The framework will receive
// TASK_FAILED for the task, and will most likely find out the
// cause with some debugging. This is a temporary solution. A more
// correct solution is to perform this validation at master side.
- if (task.command().shell()) {
- CHECK(task.command().has_value())
- << "Shell command of task " << task.task_id()
- << " is not specified!";
+ if (command.shell()) {
+ CHECK(command.has_value())
+ << "Shell command of task '" << task.task_id()
+ << "' is not specified!";
} else {
- CHECK(task.command().has_value())
- << "Executable of task " << task.task_id()
- << " is not specified!";
+ CHECK(command.has_value())
+ << "Executable of task '" << task.task_id()
+ << "' is not specified!";
}
}
@@ -241,31 +264,31 @@ public:
}
// Prepare the argv before fork as it's not async signal safe.
- char **argv = new char*[task.command().arguments().size() + 1];
- for (int i = 0; i < task.command().arguments().size(); i++) {
- argv[i] = (char*) task.command().arguments(i).c_str();
+ char **argv = new char*[command.arguments().size() + 1];
+ for (int i = 0; i < command.arguments().size(); i++) {
+ argv[i] = (char*) command.arguments(i).c_str();
}
- argv[task.command().arguments().size()] = NULL;
+ argv[command.arguments().size()] = NULL;
// Prepare the command log message.
- string command;
+ string commandString;
if (override.isSome()) {
char** argv = override.get();
// argv is guaranteed to be NULL terminated and we rely on
// that fact to print command to be executed.
for (int i = 0; argv[i] != NULL; i++) {
- command += string(argv[i]) + " ";
+ commandString += string(argv[i]) + " ";
}
- } else if (task.command().shell()) {
- command = "sh -c '" + task.command().value() + "'";
+ } else if (command.shell()) {
+ commandString = "sh -c '" + command.value() + "'";
} else {
- command =
- "[" + task.command().value() + ", " +
- strings::join(", ", task.command().arguments()) + "]";
+ commandString =
+ "[" + command.value() + ", " +
+ strings::join(", ", command.arguments()) + "]";
}
if ((pid = fork()) == -1) {
- cerr << "Failed to fork to run " << command << ": "
+ cerr << "Failed to fork to run " << commandString << ": "
<< os::strerror(errno) << endl;
abort();
}
@@ -344,19 +367,19 @@ public:
}
- cout << command << endl;
+ cout << commandString << endl;
// The child has successfully setsid, now run the command.
if (override.isNone()) {
- if (task.command().shell()) {
+ if (command.shell()) {
execlp(
"sh",
"sh",
"-c",
- task.command().value().c_str(),
+ command.value().c_str(),
(char*) NULL);
} else {
- execvp(task.command().value().c_str(), argv);
+ execvp(command.value().c_str(), argv);
}
} else {
char** argv = override.get();
@@ -624,6 +647,7 @@ private:
Option<char**> override;
Option<string> sandboxDirectory;
Option<string> user;
+ Option<string> taskCommand;
};
@@ -634,10 +658,11 @@ public:
const Option<char**>& override,
const string& healthCheckDir,
const Option<string>& sandboxDirectory,
- const Option<string>& user)
+ const Option<string>& user,
+ const Option<string>& taskCommand)
{
process = new CommandExecutorProcess(
- override, healthCheckDir, sandboxDirectory, user);
+ override, healthCheckDir, sandboxDirectory, user, taskCommand);
spawn(process);
}
@@ -715,6 +740,8 @@ class Flags : public flags::FlagsBase
public:
Flags()
{
+ // TODO(gilbert): Deprecate the 'override' flag since no one is
+ // using it, and it may cause confusing with 'task_command' flag.
add(&override,
"override",
"Whether to override the command the executor should run when the\n"
@@ -734,6 +761,11 @@ public:
"user",
"The user that the task should be running as.");
+ add(&task_command,
+ "task_command",
+ "If specified, this is the overrided command for launching the\n"
+ "task (instead of the command from TaskInfo).");
+
// TODO(nnielsen): Add 'prefix' option to enable replacing
// 'sh -c' with user specified wrapper.
}
@@ -741,6 +773,7 @@ public:
bool override;
Option<string> sandbox_directory;
Option<string> user;
+ Option<string> task_command;
};
@@ -774,11 +807,19 @@ int main(int argc, char** argv)
}
const Option<string> envPath = os::getenv("MESOS_LAUNCHER_DIR");
- string path =
- envPath.isSome() ? envPath.get()
- : os::realpath(Path(argv[0]).dirname()).get();
+
+ string path = envPath.isSome()
+ ? envPath.get()
+ : os::realpath(Path(argv[0]).dirname()).get();
+
mesos::internal::CommandExecutor executor(
- override, path, flags.sandbox_directory, flags.user);
+ override,
+ path,
+ flags.sandbox_directory,
+ flags.user,
+ flags.task_command);
+
mesos::MesosExecutorDriver driver(&executor);
+
return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
}