You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2016/02/04 23:56:21 UTC

[2/3] mesos git commit: Added a new flag to command executor for overriding the task command.

Added a new flag to command executor for overriding the task command.

Review: https://reviews.apache.org/r/43082/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9228d91d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9228d91d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9228d91d

Branch: refs/heads/master
Commit: 9228d91d2da0d928f373bec205e0e31deeadfd37
Parents: 21224a7
Author: Gilbert Song <so...@gmail.com>
Authored: Thu Feb 4 13:48:37 2016 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Feb 4 14:56:14 2016 -0800

----------------------------------------------------------------------
 src/launcher/executor.cpp | 115 ++++++++++++++++++++++++++++-------------
 1 file changed, 78 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9228d91d/src/launcher/executor.cpp
----------------------------------------------------------------------
diff --git a/src/launcher/executor.cpp b/src/launcher/executor.cpp
index 356d311..b214a3f 100644
--- a/src/launcher/executor.cpp
+++ b/src/launcher/executor.cpp
@@ -39,6 +39,7 @@
 
 #include <stout/duration.hpp>
 #include <stout/flags.hpp>
+#include <stout/json.hpp>
 #include <stout/lambda.hpp>
 #include <stout/option.hpp>
 #include <stout/os.hpp>
@@ -81,7 +82,8 @@ public:
       const Option<char**>& override,
       const string& _healthCheckDir,
       const Option<string>& _sandboxDirectory,
-      const Option<string>& _user)
+      const Option<string>& _user,
+      const Option<string>& _taskCommand)
     : state(REGISTERING),
       launched(false),
       killed(false),
@@ -93,7 +95,8 @@ public:
       healthCheckDir(_healthCheckDir),
       override(override),
       sandboxDirectory(_sandboxDirectory),
-      user(_user) {}
+      user(_user),
+      taskCommand(_taskCommand) {}
 
   virtual ~CommandExecutorProcess() {}
 
@@ -137,26 +140,46 @@ public:
       return;
     }
 
-    // Skip sanity checks for TaskInfo if override is provided since
-    // the executor will be running the override command.
-    if (override.isNone()) {
-      // Sanity checks.
-      CHECK(task.has_command()) << "Expecting task " << task.task_id()
-                                << " to have a command!";
+    // Determine the command to launch the task.
+    CommandInfo command;
+
+    if (taskCommand.isSome()) {
+      // Get CommandInfo from a JSON string.
+      Try<JSON::Object> object = JSON::parse<JSON::Object>(taskCommand.get());
+      if (object.isError()) {
+        cerr << "Failed to parse JSON: " << object.error() << endl;
+        abort();
+      }
+
+      Try<CommandInfo> parse = protobuf::parse<CommandInfo>(object.get());
+      if (parse.isError()) {
+        cerr << "Failed to parse protobuf: " << parse.error() << endl;
+        abort();
+      }
+
+      command = parse.get();
+    } else if (task.has_command()) {
+      command = task.command();
+    } else {
+      CHECK_SOME(override)
+        << "Expecting task '" << task.task_id()
+        << "' to have a command!";
+    }
 
+    if (override.isNone()) {
       // TODO(jieyu): For now, we just fail the executor if the task's
       // CommandInfo is not valid. The framework will receive
       // TASK_FAILED for the task, and will most likely find out the
       // cause with some debugging. This is a temporary solution. A more
       // correct solution is to perform this validation at master side.
-      if (task.command().shell()) {
-        CHECK(task.command().has_value())
-          << "Shell command of task " << task.task_id()
-          << " is not specified!";
+      if (command.shell()) {
+        CHECK(command.has_value())
+          << "Shell command of task '" << task.task_id()
+          << "' is not specified!";
       } else {
-        CHECK(task.command().has_value())
-          << "Executable of task " << task.task_id()
-          << " is not specified!";
+        CHECK(command.has_value())
+          << "Executable of task '" << task.task_id()
+          << "' is not specified!";
       }
     }
 
@@ -241,31 +264,31 @@ public:
     }
 
     // Prepare the argv before fork as it's not async signal safe.
-    char **argv = new char*[task.command().arguments().size() + 1];
-    for (int i = 0; i < task.command().arguments().size(); i++) {
-      argv[i] = (char*) task.command().arguments(i).c_str();
+    char **argv = new char*[command.arguments().size() + 1];
+    for (int i = 0; i < command.arguments().size(); i++) {
+      argv[i] = (char*) command.arguments(i).c_str();
     }
-    argv[task.command().arguments().size()] = NULL;
+    argv[command.arguments().size()] = NULL;
 
     // Prepare the command log message.
-    string command;
+    string commandString;
     if (override.isSome()) {
       char** argv = override.get();
       // argv is guaranteed to be NULL terminated and we rely on
       // that fact to print command to be executed.
       for (int i = 0; argv[i] != NULL; i++) {
-        command += string(argv[i]) + " ";
+        commandString += string(argv[i]) + " ";
       }
-    } else if (task.command().shell()) {
-      command = "sh -c '" + task.command().value() + "'";
+    } else if (command.shell()) {
+      commandString = "sh -c '" + command.value() + "'";
     } else {
-      command =
-        "[" + task.command().value() + ", " +
-        strings::join(", ", task.command().arguments()) + "]";
+      commandString =
+        "[" + command.value() + ", " +
+        strings::join(", ", command.arguments()) + "]";
     }
 
     if ((pid = fork()) == -1) {
-      cerr << "Failed to fork to run " << command << ": "
+      cerr << "Failed to fork to run " << commandString << ": "
            << os::strerror(errno) << endl;
       abort();
     }
@@ -344,19 +367,19 @@ public:
       }
 
 
-      cout << command << endl;
+      cout << commandString << endl;
 
       // The child has successfully setsid, now run the command.
       if (override.isNone()) {
-        if (task.command().shell()) {
+        if (command.shell()) {
           execlp(
               "sh",
               "sh",
               "-c",
-              task.command().value().c_str(),
+              command.value().c_str(),
               (char*) NULL);
         } else {
-          execvp(task.command().value().c_str(), argv);
+          execvp(command.value().c_str(), argv);
         }
       } else {
         char** argv = override.get();
@@ -624,6 +647,7 @@ private:
   Option<char**> override;
   Option<string> sandboxDirectory;
   Option<string> user;
+  Option<string> taskCommand;
 };
 
 
@@ -634,10 +658,11 @@ public:
       const Option<char**>& override,
       const string& healthCheckDir,
       const Option<string>& sandboxDirectory,
-      const Option<string>& user)
+      const Option<string>& user,
+      const Option<string>& taskCommand)
   {
     process = new CommandExecutorProcess(
-        override, healthCheckDir, sandboxDirectory, user);
+        override, healthCheckDir, sandboxDirectory, user, taskCommand);
     spawn(process);
   }
 
@@ -715,6 +740,8 @@ class Flags : public flags::FlagsBase
 public:
   Flags()
   {
+    // TODO(gilbert): Deprecate the 'override' flag since no one is
+    // using it, and it may cause confusing with 'task_command' flag.
     add(&override,
         "override",
         "Whether to override the command the executor should run when the\n"
@@ -734,6 +761,11 @@ public:
         "user",
         "The user that the task should be running as.");
 
+    add(&task_command,
+        "task_command",
+        "If specified, this is the overrided command for launching the\n"
+        "task (instead of the command from TaskInfo).");
+
     // TODO(nnielsen): Add 'prefix' option to enable replacing
     // 'sh -c' with user specified wrapper.
   }
@@ -741,6 +773,7 @@ public:
   bool override;
   Option<string> sandbox_directory;
   Option<string> user;
+  Option<string> task_command;
 };
 
 
@@ -774,11 +807,19 @@ int main(int argc, char** argv)
   }
 
   const Option<string> envPath = os::getenv("MESOS_LAUNCHER_DIR");
-  string path =
-    envPath.isSome() ? envPath.get()
-                     : os::realpath(Path(argv[0]).dirname()).get();
+
+  string path = envPath.isSome()
+    ? envPath.get()
+    : os::realpath(Path(argv[0]).dirname()).get();
+
   mesos::internal::CommandExecutor executor(
-      override, path, flags.sandbox_directory, flags.user);
+      override,
+      path,
+      flags.sandbox_directory,
+      flags.user,
+      flags.task_command);
+
   mesos::MesosExecutorDriver driver(&executor);
+
   return driver.run() == mesos::DRIVER_STOPPED ? EXIT_SUCCESS : EXIT_FAILURE;
 }